]> Cypherpunks.ru repositories - nncp.git/commitdiff
Merge branch 'develop' v6.1.0
authorSergey Matveev <stargrave@stargrave.org>
Wed, 24 Feb 2021 09:26:05 +0000 (12:26 +0300)
committerSergey Matveev <stargrave@stargrave.org>
Wed, 24 Feb 2021 09:26:05 +0000 (12:26 +0300)
43 files changed:
README
README.RU
doc/admin.texi
doc/call.texi
doc/cfg.texi
doc/cmds.texi
doc/contacts.texi
doc/download.texi
doc/integration.texi
doc/news.ru.texi
doc/news.texi
doc/sendmail.sh [new file with mode: 0755]
doc/spool.texi
makedist.sh
ports/nncp/files/nncp.newsyslog.conf.sample
src/call.go
src/cfg.go
src/check.go
src/cmd/nncp-bundle/main.go
src/cmd/nncp-call/main.go
src/cmd/nncp-caller/main.go
src/cmd/nncp-cfgnew/main.go
src/cmd/nncp-check/main.go
src/cmd/nncp-daemon/main.go
src/cmd/nncp-rm/main.go
src/cmd/nncp-stat/main.go
src/cmd/nncp-xfer/main.go
src/ctx.go
src/go.mod
src/go.sum
src/humanizer.go
src/jobs.go
src/log.go
src/nncp.go
src/pkt.go
src/pkt_test.go
src/progress.go
src/sp.go
src/tmp.go
src/toss.go
src/toss_test.go
src/tx.go
src/tx_test.go

diff --git a/README b/README
index e8bb290c2673dd0e38eb4507e00bf750ca78a3ef..eca5b5a85956ffc622fdc6709d6563f3e782e8be 100644 (file)
--- a/README
+++ b/README
@@ -22,7 +22,7 @@ Home page: http://www.nncpgo.org/
 
 Please send questions regarding the use of NNCP, bug reports and
 patches to nncp-devel mailing list:
-https://lists.cypherpunks.ru/pipermail/nncp-devel/
+http://lists.cypherpunks.ru/nncp_002ddevel.html
 
 Development Git source code repository currently is located here:
 http://www.git.cypherpunks.ru/?p=nncp.git;a=summary
index 9642305f523d752ac4b36edd1f965c32f77d421a..9da6599c164819a6c52cccfb2a87fdc8b4b87710 100644 (file)
--- a/README.RU
+++ b/README.RU
@@ -27,7 +27,7 @@ NNCP это свободное программное обеспечением:
 
 Пожалуйста все вопросы касающиеся использования NNCP, отчёты об ошибках
 и патчи отправляйте в nncp-devel почтовую рассылку:
-https://lists.cypherpunks.ru/pipermail/nncp-devel/
+http://lists.cypherpunks.ru/nncp_002ddevel.html
 
 Исходный код для разработчика находится в Git репозитории:
 http://www.git.cypherpunks.ru/?p=nncp.git;a=summary
index 02c15794a5be8f865462a18ea338106e1d607480..8c46a681cd47483b4774ab9ba1289a957d2a94d3 100644 (file)
@@ -45,7 +45,7 @@ $ echo 'umask: "007"' >> /usr/local/etc/nncp.hjson
 
     Example @url{https://www.newsyslog.org/manual.html, newsyslog}'s entry:
 @example
-/var/spool/nncp/log            644     7       100     *       CYN
+/var/spool/nncp/log            644     7       100     *       BCYN
 @end example
 
 @item
index 81ee9bafd465a0b5d15044c3406fc035b5d431f6..fe55de5456d2b9f82c4aa83b428f6c13155d4c0b 100644 (file)
@@ -31,6 +31,7 @@ calls: [
     {
         cron: "*/5 * * * * * *"
         when-tx-exists: true
+        nock: true
     },
 ]
 @end verbatim
@@ -85,4 +86,14 @@ created, or skip any kind of packet processing.
 @item when-tx-exists
 Call only if packets for sending exists.
 
+@anchor{CfgNoCK}
+@item nock
+NoCK (no-checksumming) tells not to do checksumming of received files,
+assuming that it will be done for example with @ref{nncp-check} command
+later. That can help minimizing time spent online, because HDD won't do
+simultaneous reading of the data for checksumming and writing of the
+received one, but just sequential writing of the file. Pay attention
+that you have to make a call to remote node after checksumming is done,
+to send notification about successful packet reception.
+
 @end table
index 3e0b882b0582f7c16792b9ed057e18d1b4c46d07..cc6a24ae72190dd020e39a8d41519bb73304c9c0 100644 (file)
@@ -9,6 +9,7 @@ Example @url{https://hjson.org/, Hjson} configuration file:
   log: /var/spool/nncp/log
   umask: "022"
   noprogress: true
+  nohdr: true
 
   notify: {
     file: {
@@ -103,6 +104,9 @@ Enabled @strong{noprogress} option disabled progress showing for many
 commands by default. You can always force its showing with
 @option{-progress} command line option anyway.
 
+@anchor{CfgNoHdr}
+@strong{nohdr} option disables @ref{HdrFile, .hdr} files usage.
+
 @anchor{CfgNotify}
 @strong{notify} section contains notification settings for successfully
 tossed file, freq and exec packets. Corresponding @strong{from} and
index 6510ed889c61067ac62486a338333ee7cd77cd6e..5141b08dd09bf0d98f0f0c4e686d71ca8452d941 100644 (file)
@@ -102,6 +102,7 @@ $ nncp-call [options]
     [-rxrate INT]
     [-txrate INT]
     [-autotoss*]
+    [-nock]
     NODE[:ADDR] [FORCEADDR]
 @end example
 
@@ -114,15 +115,17 @@ transfer.
 
 If @option{-rx} option is specified then only inbound packets
 transmission is performed. If @option{-tx} option is specified, then
-only outbound transmission is performed. @option{-onlinedeadline}
-overrides @ref{CfgOnlineDeadline, @emph{onlinedeadline}}.
-@option{-maxonlinetime} overrides @ref{CfgMaxOnlineTime,
-@emph{maxonlinetime}}. @option{-rxrate}/@option{-txrate} override
-@ref{CfgXxRate, rxrate/txrate}. @option{-list} option allows you to list
-packets of remote node, without any transmission.
+only outbound transmission is performed.
 
-You can specify what packets your want to download, by specifying
-@option{-pkts} option with comma-separated list of packets identifiers.
+@option{-onlinedeadline} overrides @ref{CfgOnlineDeadline, @emph{onlinedeadline}}.
+@option{-maxonlinetime} overrides @ref{CfgMaxOnlineTime, @emph{maxonlinetime}}.
+@option{-rxrate}/@option{-txrate} override @ref{CfgXxRate, rxrate/txrate}.
+Read @ref{CfgNoCK, more} about @option{-nock} option.
+
+@option{-list} option allows you to list packets of remote node, without
+any transmission. You can specify what packets your want to download, by
+specifying @option{-pkts} option with comma-separated list of packets
+identifiers.
 
 Each @option{NODE} can contain several uniquely identified
 @option{ADDR}esses in @ref{CfgAddrs, configuration} file. If you do
@@ -230,13 +233,16 @@ operating system.
 @section nncp-check
 
 @example
-$ nncp-check [options]
+$ nncp-check [-nock] [options]
 @end example
 
 Perform @ref{Spool, spool} directory integrity check. Read all files
 that has Base32-encoded filenames and compare it with recalculated
-BLAKE2b hash output of their contents. That supplementary command is
-not used often in practice, if ever.
+BLAKE2b hash output of their contents.
+
+The most useful mode of operation is with @option{-nock} option, that
+checks integrity of @file{.nock} files, renaming them to ordinary
+(verified) encrypted packets.
 
 @node nncp-cronexpr
 @section nncp-cronexpr
@@ -252,7 +258,9 @@ next time entities.
 @section nncp-daemon
 
 @example
-$ nncp-daemon [options] [-maxconn INT] [-bind ADDR] [-inetd] [-autotoss*]
+$ nncp-daemon [options]
+    [-maxconn INT] [-bind ADDR] [-inetd]
+    [-autotoss*] [-nock]
 @end example
 
 Start listening TCP daemon, wait for incoming connections and run
@@ -278,6 +286,8 @@ uucp        stream  tcp6    nowait  nncpuser        /usr/local/bin/nncp-daemon      nncp-daemon -quiet -
 during the call. All @option{-autotoss-*} options is the same as in
 @ref{nncp-toss} command.
 
+Read @ref{CfgNoCK, more} about @option{-nock} option.
+
 @node nncp-exec
 @section nncp-exec
 
@@ -509,6 +519,7 @@ $ nncp-rm [options] -tmp
 $ nncp-rm [options] -lock
 $ nncp-rm [options] -node NODE -part
 $ nncp-rm [options] -node NODE -seen
+$ nncp-rm [options] -node NODE -nock
 $ nncp-rm [options] -node NODE [-rx] [-tx]
 $ nncp-rm [options] -node NODE -pkt PKT
 @end example
@@ -529,10 +540,10 @@ Base32 name) will be deleted. This is useful when you see some packet
 failing to be processed.
 
 @item When either @option{-rx} or @option{-tx} options are specified
-(maybe both of them), then delete all packets from that given queues. If
-@option{-part} is given, then delete only @file{.part}ly downloaded
-ones. If @option{-seen} option is specified, then delete only
-@file{.seen} files.
+(maybe both of them), then delete all packets from that given queues.
+@option{-part} option deletes @file{.part}ly downloaded files.
+@option{-seen} option deletes @file{.seen} files. @option{-nock} option
+deletes non-checksummed (non-verified) @file{.nock} files.
 
 @item @option{-dryrun} option just prints what will be deleted.
 
index 473df97e9c1dc508ad07b7e13d285034873f7aac..4223bf312e4d7b1c6496bd5d0cef2a8bab2f1252 100644 (file)
@@ -2,7 +2,7 @@
 @unnumbered Contacts
 
 Please send questions regarding the use of NNCP, bug reports and patches to
-@url{https://lists.cypherpunks.ru/pipermail/nncp-devel/, nncp-devel}
+@url{http://lists.cypherpunks.ru/nncp_002ddevel.html, nncp-devel}
 mailing list. Announcements also go to this mailing list.
 
 Official website is @url{http://www.nncpgo.org/}.
index df6e4b6512fe8651adffd46875c09416860d4d40..8df33b5f5b15b870fdb15d3cb0a5fdc99c78104e 100644 (file)
@@ -25,6 +25,10 @@ Tarballs include all necessary required libraries:
 @multitable {XXXXX} {XXXX-XX-XX} {XXXX KiB} {link sign} {xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx}
 @headitem Version @tab Date @tab Size @tab Tarball @tab SHA256 checksum
 
+@item @ref{Release 6.0.0, 6.0.0} @tab 2021-01-23 @tab 1028 KiB
+@tab @url{download/nncp-6.0.0.tar.xz, link} @url{download/nncp-6.0.0.tar.xz.sig, sign}
+@tab @code{42FE8AA5 4520B3A1 ABB50D66 1BBBA6A1 41CE4E74 9B4816B0 D4C6845D 67465916}
+
 @item @ref{Release 5.6.0, 5.6.0} @tab 2021-01-17 @tab 1024 KiB
 @tab @url{download/nncp-5.6.0.tar.xz, link} @url{download/nncp-5.6.0.tar.xz.sig, sign}
 @tab @code{1DC83F05 F14A3C3B 95820046 C60B170E B8C8936F 142A5B9A 1E943E6F 4CEFBDE3}
index 3359a3a72b25534eaf7e419a169128928433d908..84698ccad61fbf1c40c4cc0478be64279a42d372 100644 (file)
@@ -55,16 +55,27 @@ delivery via NNCP:
 @example
 /usr/local/etc/postfix/master.cf:
 nncp      unix  -       n       n       -       -       pipe
-          flags=F user=nncp argv=nncp-exec -quiet $nexthop sendmail $recipient
+  flags=FRqhu user=nncp argv=nncp-exec -quiet $nexthop sendmail $recipient
 @end example
 
 This runs the @command{nncp-exec} command to place outgoing mail into
-the NNCP queue after replacing @var{$nexthop} by the the receiving NNCP
+the NNCP queue after replacing @var{$nexthop} by the receiving NNCP
 node and after replacing @var{$recipient} by the recipients. The
 @command{pipe(8)} delivery agent executes the @command{nncp-exec}
 command without assistance from the shell, so there are no problems with
 shell meta characters in command-line parameters.
 
+Pay attention to @code{flags}, containing @code{R}, telling Postfix to
+include @code{Return-Path:} header. Otherwise that envelope sender
+information will be lost. Possibly you will also need somehow to
+preserve that header on the receiving side, because @command{sendmail}
+command will replace it. For example you can rename it before feeding to
+@command{sendmail} with
+@code{reformail -R Return-Path: X-Original-Return-Path: | sendmail}, or
+extract with:
+
+@verbatiminclude sendmail.sh
+
 @item Specify that mail for @emph{example.com}, should be delivered via
 NNCP, to a host named @emph{nncp-host}:
 
@@ -134,7 +145,7 @@ mail delivery via NNCP:
 @example
 /usr/local/etc/postfix/master.cf:
 nncp      unix  -       n       n       -       -       pipe
-          flags=F user=nncp argv=nncp-exec -quiet $nexthop sendmail $recipient
+  flags=Fqhu user=nncp argv=nncp-exec -quiet $nexthop sendmail $recipient
 @end example
 
 This runs the @command{nncp-exec} command to place outgoing mail into
index d758060dc1442eae13adf5bced5027008eb86554..9e9c669c401b63f8538a071a09703b188e632c4b 100644 (file)
@@ -1,6 +1,36 @@
 @node Новости
 @section Новости
 
+@node Релиз 6.1.0
+@subsection Релиз 6.1.0
+@itemize
+
+@item
+Оптимизация: большинство команд теперь не держат открытыми файловые
+дескрипторы. Прежде вы легко могли выйти за пределы максимально
+допустимого количества открытых файлов, если у вас было много пакетов в
+spool директории.
+
+@item
+Оптимизация: не закрывать файловый дескриптор файла который мы качаем.
+Прежде каждый его кусочек приводил к дорогим open/close вызовам.
+
+@item
+Скачиваемые в режиме online файлы теперь сохраняются с @file{.nock}
+суффиксом (non-checksummed), ожидая пока либо @command{nncp-check}, либо
+online демоны не выполнят проверку целостности.
+
+@item
+Оптимизация: для файлов, скачивание которых не было продолжено, сразу же
+вычисляет контрольная сумма, пропуская промежуточный @file{.nock} шаг.
+
+@item
+Возможность хранения заголовков зашифрованных пакетов в @file{.hdr}
+файлах, рядом с самими пакетами. Это может существенно повысить скорость
+получения списка пакетов на файловых системах с большим размером блока.
+
+@end itemize
+
 @node Релиз 6.0.0
 @subsection Релиз 6.0.0
 @itemize
index 866fb531d435c1ed6e49e7d44e3df39e61f3f3af..6be8466863d35898b216862c6b5cdd4b727a0874 100644 (file)
@@ -3,11 +3,41 @@
 
 See also this page @ref{Новости, on russian}.
 
+@node Release 6.1.0
+@section Release 6.1.0
+@itemize
+
+@item
+Optimization: most commands do not keep opened file descriptors now.
+Previously you can exceed maximal number of opened files if you have got
+many packets in the spool directory.
+
+@item
+Optimization: do not close file descriptor of the file we download
+online. Previously each chunk lead to expensive open/close calls.
+
+@item
+Online downloaded files are saved with @file{.nock} (non-checksummed)
+suffix, waiting either for @command{nncp-check}, or online daemons to
+perform integrity check.
+
+@item
+Optimization: files, that are not resumed, are checksummed immediately
+during the online download, skipping @file{.nock}-intermediate step.
+
+@item
+Ability to store encrypted packet's header in @file{.hdr} file, close to
+the packet itself. That can greatly increase performance of packets
+listing on filesystems with big block's size.
+
+@end itemize
+
 @node Release 6.0.0
 @section Release 6.0.0
 @itemize
 
-@item Log uses human readable and easy machine parseable
+@item
+Log uses human readable and easy machine parseable
 @url{https://www.gnu.org/software/recutils/, recfile} format for the
 records, instead of structured RFC 3339 lines. Old logs are not readable
 by @command{nncp-log} anymore.
diff --git a/doc/sendmail.sh b/doc/sendmail.sh
new file mode 100755 (executable)
index 0000000..b826091
--- /dev/null
@@ -0,0 +1,6 @@
+#!/bin/sh -e
+
+tmp=`mktemp`
+trap "rm -f $tmp" HUP PIPE INT QUIT TERM EXIT
+cat > $tmp
+sendmail -f "`reformail -x Return-Path: < $tmp`" $@ < $tmp
index 5e7812d50b6a0944356aed56ce12bcc488662b55..8dcaa3ead7121328bd8c87cfed38ae77f6cfff37 100644 (file)
@@ -3,39 +3,78 @@
 
 Spool directory holds @ref{Encrypted, encrypted packets} received from
 remote nodes and queued for sending to them. It has the following
-example structure:
+example structure with just single outbound (@code{tx}) packet
+@code{LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ} to the node
+@code{2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ}:
 
 @example
-spool/tmp/
-spool/2WHB...OABQ/rx.lock
-spool/2WHB...OABQ/rx/5ZIB...UMKW.part
-spool/2WHB...OABQ/tx.lock
-spool/2WHB...OABQ/toss.lock
-spool/BYRR...CG6Q/rx.lock
-spool/BYRR...CG6Q/rx/
-spool/BYRR...CG6Q/tx.lock
-spool/BYRR...CG6Q/tx/AQUT...DGNT.seen
-spool/BYRR...CG6Q/tx/NSYY...ZUU6
-spool/BYRR...CG6Q/tx/VCSR...3VXX.seen
-spool/BYRR...CG6Q/tx/ZI5U...5RRQ
+spool/2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ/toss.lock
+spool/2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ/rx.lock
+spool/2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ/rx/
+spool/2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ/tx.lock
+spool/2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ/tx/LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ
+spool/tmp
 @end example
 
-Except for @file{tmp}, all other directories are Base32-encoded node
-identifiers (@file{2WHB...OABQ}, @file{BYRR...CG6Q} in our example).
-Each node subdirectory has @file{rx} (received, partially received and
-currently unprocessed packets) and @file{tx} (for outbound packets)
-directories.
-
-Each @file{rx}/@file{tx} directory contains one file per encrypted
-packet. Its filename is Base32 encoded BLAKE2b hash of the contents. So
-it can be integrity checked at any time. @file{5ZIB...UMKW.part} is
-partially received file from @file{2WHB...OABQ} node. @file{tx}
-directory can not contain partially written files -- they are moved
-atomically from @file{tmp}.
-
-When @ref{nncp-toss} utility is called with @option{-seen} option, it
-will create empty @file{XXX.seen} files, telling that some kind of
-packet was already tossed sometime.
-
-Only one process can work with @file{rx}/@file{tx} directories at once,
-so there are corresponding lock files.
+@table @file
+
+@item tmp
+directory contains various temporary files that under normal
+circumstances are renamed to necessary files inside other directories.
+All directories in @file{spool} @strong{have to} be on the same
+filesystem for working renaming.
+
+@item 2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ
+is an example Base32-encoded neighbour identifier.
+
+@item rx, tx
+directories are for incoming and outgoing encrypted packets. @file{rx}
+contains currently unfinished, non-checked, unprocessed, etc packets.
+
+@item toss.lock, rx.lock, tx.lock
+Lock files. Only single process can work with @file{rx}/@file{tx}
+directories at once.
+
+@item LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ
+is an example @ref{Encrypted, encrypted packet}. Its filename is Base32
+encoded BLAKE2b hash of the whole contents. It can be integrity checked
+anytime.
+
+@item LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ.part
+is an example @strong{partly} received file. It can appear only when
+online transfer is used. Its filename is sent by remote side and until
+file is fully downloaded -- it plays no role.
+
+@item LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ.nock
+non-checksummed (NoCK) @strong{fully} received file. Its checksum is
+verified against its filename either by @ref{nncp-check}, or by working
+online daemons. If it is correct, then its extension is trimmed.
+
+@item LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ.seen
+@ref{nncp-toss} utility can be invoked with @option{-seen} option,
+leading to creation of @file{.seen} files, telling that the file with
+specified hash has already been processed before. It could be useful
+when there are use-cases where multiple ways of packets transfer
+available and there is possibility of duplicates reception. You have to
+manually remove them, when you do not need them (probably because they
+are expired).
+
+@anchor{HdrFile}
+@item LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ.hdr
+If no @ref{CfgNoHdr, nohdr} option is enabled in configuration file,
+then @file{.hdr} files are automatically created for every ordinary
+(fully received and checksummed) packet. It literally contains just the
+header of the corresponding packet. It will be automatically created
+even during simple @ref{nncp-stat} call. On filesystems with big
+blocksize (ZFS for example) it can greatly help listing the packets in
+directories, because it prevents unnecessary read-amplification. On
+other filesystems probably it won't help at all, or even harm
+performance.
+
+There is a hack: you can create more dense @file{.hdr} allocation by
+removing all @file{.hdr} files and then running @command{nncp-stat},
+that will recreate them. In many cases many @file{.hdr} files will be
+allocated more or less linearly on the disk, decreasing listing time
+even more.
+
+@end table
index 8537c9e24bceab1ccd61e900d9e79d2393f9e0fa..eef4931af8725dc9c8626572e3c7d4b8a5a4c97d 100755 (executable)
@@ -170,7 +170,7 @@ GPG key ID: 0x2B25868E75A1A953 NNCP releases <releases@nncpgo.org>
 Fingerprint: 92C2 F0AE FE73 208E 46BF  F3DE 2B25 868E 75A1 A953
 
 Please send questions regarding the use of NNCP, bug reports and patches
-to mailing list: https://lists.cypherpunks.ru/pipermail/nncp-devel/
+to mailing list: http://lists.cypherpunks.ru/nncp_002ddevel.html
 EOF
 
 cat <<EOF
@@ -220,5 +220,5 @@ SHA256 хэш: $hash
 
 Пожалуйста, все вопросы касающиеся использования NNCP, отчёты об ошибках
 и патчи отправляйте в nncp-devel почтовую рассылку:
-https://lists.cypherpunks.ru/pipermail/nncp-devel/
+http://lists.cypherpunks.ru/nncp_002ddevel.html
 EOF
index 0a16524ade0c79e2596b3a126ba49be0bdd1393e..47e134a80cb3a000c047fdc7d4b230a7f731ac85 100644 (file)
@@ -1 +1 @@
-/var/spool/nncp/log            644     7       100     *       CYN
+/var/spool/nncp/log            644     7       100     *       BCYN
index ef4d07d757c90097d39ef486dfcd6bf938bb7259..2f635d63d0aa7d36b9fb33a87206cd173db9f4fb 100644 (file)
@@ -34,6 +34,7 @@ type Call struct {
        OnlineDeadline time.Duration
        MaxOnlineTime  time.Duration
        WhenTxExists   bool
+       NoCK           bool
 
        AutoToss       bool
        AutoTossDoSeen bool
@@ -51,6 +52,7 @@ func (ctx *Ctx) CallNode(
        rxRate, txRate int,
        onlineDeadline, maxOnlineTime time.Duration,
        listOnly bool,
+       noCK bool,
        onlyPkts map[[32]byte]bool,
 ) (isGood bool) {
        for _, addr := range addrs {
@@ -78,6 +80,7 @@ func (ctx *Ctx) CallNode(
                        rxRate:         rxRate,
                        txRate:         txRate,
                        listOnly:       listOnly,
+                       NoCK:           noCK,
                        onlyPkts:       onlyPkts,
                }
                if err = state.StartI(conn); err == nil {
index 946243396b7403c99aa499e22fd870a123516365..6e781d19e62a0ca1d6f0e628466f91be8af22a6d 100644 (file)
@@ -82,6 +82,7 @@ type CallJSON struct {
        OnlineDeadline *uint   `json:"onlinedeadline,omitempty"`
        MaxOnlineTime  *uint   `json:"maxonlinetime,omitempty"`
        WhenTxExists   *bool   `json:"when-tx-exists,omitempty"`
+       NoCK           *bool   `json:"nock"`
 
        AutoToss       *bool `json:"autotoss,omitempty"`
        AutoTossDoSeen *bool `json:"autotoss-doseen,omitempty"`
@@ -118,6 +119,7 @@ type CfgJSON struct {
        Umask string `json:"umask,omitempty"`
 
        OmitPrgrs bool `json:"noprogress,omitempty"`
+       NoHdr     bool `json:"nohdr,omitempty"`
 
        Notify *NotifyJSON `json:"notify,omitempty"`
 
@@ -284,6 +286,9 @@ func NewNode(name string, cfg NodeJSON) (*Node, error) {
                if callCfg.WhenTxExists != nil {
                        call.WhenTxExists = *callCfg.WhenTxExists
                }
+               if callCfg.NoCK != nil {
+                       call.NoCK = *callCfg.NoCK
+               }
                if callCfg.AutoToss != nil {
                        call.AutoToss = *callCfg.AutoToss
                }
@@ -459,11 +464,16 @@ func CfgParse(data []byte) (*Ctx, error) {
        if cfgJSON.OmitPrgrs {
                showPrgrs = false
        }
+       hdrUsage := true
+       if cfgJSON.NoHdr {
+               hdrUsage = false
+       }
        ctx := Ctx{
                Spool:      spoolPath,
                LogPath:    logPath,
                UmaskForce: umaskForce,
                ShowPrgrs:  showPrgrs,
+               HdrUsage:   hdrUsage,
                Self:       self,
                Neigh:      make(map[NodeId]*Node, len(cfgJSON.Neigh)),
                Alias:      make(map[string]*NodeId),
index 731e86937c55cb7b2840bbad3d2a1e67cca17398..ac26e9ce3a322d227ff8530b4a7fe35c8b9442d8 100644 (file)
@@ -23,10 +23,14 @@ import (
        "errors"
        "io"
        "log"
+       "os"
+       "path/filepath"
 
        "golang.org/x/crypto/blake2b"
 )
 
+const NoCKSuffix = ".nock"
+
 func Check(src io.Reader, checksum []byte, les LEs, showPrgrs bool) (bool, error) {
        hsh, err := blake2b.New256(nil)
        if err != nil {
@@ -47,8 +51,13 @@ func (ctx *Ctx) checkXxIsBad(nodeId *NodeId, xx TRxTx) bool {
                        {"Pkt", Base32Codec.EncodeToString(job.HshValue[:])},
                        {"FullSize", job.Size},
                }
-               gut, err := Check(job.Fd, job.HshValue[:], les, ctx.ShowPrgrs)
-               job.Fd.Close() // #nosec G104
+               fd, err := os.Open(job.Path)
+               if err != nil {
+                       ctx.LogE("check", les, err, "")
+                       return true
+               }
+               gut, err := Check(fd, job.HshValue[:], les, ctx.ShowPrgrs)
+               fd.Close() // #nosec G104
                if err != nil {
                        ctx.LogE("check", les, err, "")
                        return true
@@ -64,3 +73,47 @@ func (ctx *Ctx) checkXxIsBad(nodeId *NodeId, xx TRxTx) bool {
 func (ctx *Ctx) Check(nodeId *NodeId) bool {
        return !(ctx.checkXxIsBad(nodeId, TRx) || ctx.checkXxIsBad(nodeId, TTx))
 }
+
+func (ctx *Ctx) CheckNoCK(nodeId *NodeId, hshValue *[32]byte) (int64, error) {
+       dirToSync := filepath.Join(ctx.Spool, nodeId.String(), string(TRx))
+       pktName := Base32Codec.EncodeToString(hshValue[:])
+       pktPath := filepath.Join(dirToSync, pktName)
+       fd, err := os.Open(pktPath + NoCKSuffix)
+       if err != nil {
+               return 0, err
+       }
+       defer fd.Close()
+       fi, err := fd.Stat()
+       if err != nil {
+               return 0, err
+       }
+       defer fd.Close()
+       size := fi.Size()
+       les := LEs{
+               {"XX", string(TRx)},
+               {"Node", nodeId},
+               {"Pkt", pktName},
+               {"FullSize", size},
+       }
+       gut, err := Check(fd, hshValue[:], les, ctx.ShowPrgrs)
+       if err != nil || !gut {
+               return 0, errors.New("checksum mismatch")
+       }
+       if err = os.Rename(pktPath+NoCKSuffix, pktPath); err != nil {
+               return 0, err
+       }
+       if err = DirSync(dirToSync); err != nil {
+               return size, err
+       }
+       if ctx.HdrUsage {
+               if _, err = fd.Seek(0, io.SeekStart); err != nil {
+                       return size, err
+               }
+               _, pktEncRaw, err := ctx.HdrRead(fd)
+               if err != nil {
+                       return size, err
+               }
+               ctx.HdrWrite(pktEncRaw, pktPath)
+       }
+       return size, err
+}
index 0dbe3ba34a45b9ac1e3ab97562891cab77385937..60a9330fd4bb3e6fac8dd7b62ec689bd43e73e8b 100644 (file)
@@ -125,13 +125,16 @@ func main() {
                                {K: "Pkt", V: "dummy"},
                        }
                        for job := range ctx.Jobs(&nodeId, nncp.TTx) {
-                               pktName = filepath.Base(job.Fd.Name())
+                               pktName = filepath.Base(job.Path)
                                les[len(les)-1].V = pktName
                                if job.PktEnc.Nice > nice {
                                        ctx.LogD("nncp-bundle", les, "too nice")
-                                       job.Fd.Close() // #nosec G104
                                        continue
                                }
+                               fd, err := os.Open(job.Path)
+                               if err != nil {
+                                       log.Fatalln("Error during opening:", err)
+                               }
                                if err = tarWr.WriteHeader(&tar.Header{
                                        Format:   tar.FormatUSTAR,
                                        Name:     nncp.NNCPBundlePrefix,
@@ -155,7 +158,7 @@ func main() {
                                        log.Fatalln("Error writing tar header:", err)
                                }
                                if _, err = nncp.CopyProgressed(
-                                       tarWr, job.Fd, "Tx",
+                                       tarWr, bufio.NewReader(fd), "Tx",
                                        append(les, nncp.LEs{
                                                {K: "Pkt", V: nncp.Base32Codec.EncodeToString(job.HshValue[:])},
                                                {K: "FullSize", V: job.Size},
@@ -164,7 +167,9 @@ func main() {
                                ); err != nil {
                                        log.Fatalln("Error during copying to tar:", err)
                                }
-                               job.Fd.Close() // #nosec G104
+                               if err = fd.Close(); err != nil {
+                                       log.Fatalln("Error during closing:", err)
+                               }
                                if err = tarWr.Flush(); err != nil {
                                        log.Fatalln("Error during tar flushing:", err)
                                }
@@ -172,8 +177,10 @@ func main() {
                                        log.Fatalln("Error during stdout flushing:", err)
                                }
                                if *doDelete {
-                                       if err = os.Remove(job.Fd.Name()); err != nil {
+                                       if err = os.Remove(job.Path); err != nil {
                                                log.Fatalln("Error during deletion:", err)
+                                       } else if ctx.HdrUsage {
+                                               os.Remove(job.Path + nncp.HdrSuffix)
                                        }
                                }
                                ctx.LogI("nncp-bundle", append(les, nncp.LE{K: "Size", V: job.Size}), "")
@@ -298,7 +305,10 @@ func main() {
                                if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) == pktName {
                                        ctx.LogI("nncp-bundle", les, "removed")
                                        if !*dryRun {
-                                               os.Remove(dstPath) // #nosec G104
+                                               os.Remove(dstPath)
+                                               if ctx.HdrUsage {
+                                                       os.Remove(dstPath + nncp.HdrSuffix)
+                                               }
                                        }
                                } else {
                                        ctx.LogE("nncp-bundle", les, errors.New("bad checksum"), "")
@@ -407,6 +417,9 @@ func main() {
                                        if err = nncp.DirSync(dstDirPath); err != nil {
                                                log.Fatalln("Error during syncing:", err)
                                        }
+                                       if ctx.HdrUsage {
+                                               ctx.HdrWrite(pktEncBuf, dstPath)
+                                       }
                                }
                        }
                        for _, le := range les {
index 9762d9216cd950f00390d1ea9a2ba1422eeb03a5..3136e3e9221e8a11236addfc917b0a4be24ae1e4 100644 (file)
@@ -44,6 +44,7 @@ func main() {
                rxOnly      = flag.Bool("rx", false, "Only receive packets")
                txOnly      = flag.Bool("tx", false, "Only transmit packets")
                listOnly    = flag.Bool("list", false, "Only list remote packets")
+               noCK        = flag.Bool("nock", false, "Do no checksum checking")
                onlyPktsRaw = flag.String("pkts", "", "Recieve only that packets, comma separated")
                rxRate      = flag.Int("rxrate", 0, "Maximal receive rate, pkts/sec")
                txRate      = flag.Int("txrate", 0, "Maximal transmit rate, pkts/sec")
@@ -185,6 +186,7 @@ func main() {
                onlineDeadline,
                maxOnlineTime,
                *listOnly,
+               *noCK,
                onlyPkts,
        )
 
index b702b498038ca4eeb04a4e01b29e372ea70e83f4..7ccefdd7a56b7eb622bb03665ef54c3d8a44e524 100644 (file)
@@ -138,7 +138,6 @@ func main() {
                                                        ctx.LogD("caller", les, "checking tx existence")
                                                        txExists := false
                                                        for job := range ctx.Jobs(node.Id, nncp.TTx) {
-                                                               job.Fd.Close()
                                                                if job.PktEnc.Nice > call.Nice {
                                                                        continue
                                                                }
@@ -177,6 +176,7 @@ func main() {
                                                        call.OnlineDeadline,
                                                        call.MaxOnlineTime,
                                                        false,
+                                                       call.NoCK,
                                                        nil,
                                                )
 
index 798f31cdb534f0e893ba495211df0611c3d136b5..f511c356cd76fb8fbe4b6c03dcfe658b0d53b128 100644 (file)
@@ -104,6 +104,8 @@ func main() {
   # umask: "022"
   # Omit progress showing by default
   # noprogress: true
+  # Do not use .hdr files
+  # nohdr: true
 
   # Enable notification email sending
   # notify: {
@@ -211,6 +213,7 @@ func main() {
     #   #     xx: rx
     #   #     addr: lan
     #   #     when-tx-exists: true
+    #   #     nock: true
     #   #
     #   #     autotoss: false
     #   #     autotoss-doseen: true
index 96ddd3fcf00520157ae4bb8f06778b24528810e0..e985b79ce729fbc8e76bada4f7b70842927ead6d 100644 (file)
@@ -23,6 +23,7 @@ import (
        "fmt"
        "log"
        "os"
+       "path/filepath"
 
        "go.cypherpunks.ru/nncp/v5"
 )
@@ -30,12 +31,13 @@ import (
 func usage() {
        fmt.Fprintf(os.Stderr, nncp.UsageHeader())
        fmt.Fprintf(os.Stderr, "nncp-check -- verify Rx/Tx packets checksum\n\n")
-       fmt.Fprintf(os.Stderr, "Usage: %s [options]\nOptions:\n", os.Args[0])
+       fmt.Fprintf(os.Stderr, "Usage: %s [-nock] [options]\nOptions:\n", os.Args[0])
        flag.PrintDefaults()
 }
 
 func main() {
        var (
+               nock      = flag.Bool("nock", false, "Process .nock files")
                cfgPath   = flag.String("cfg", nncp.DefaultCfgPath, "Path to configuration file")
                nodeRaw   = flag.String("node", "", "Process only that node")
                spoolPath = flag.String("spool", "", "Override path to spool")
@@ -85,7 +87,20 @@ func main() {
                if nodeOnly != nil && nodeId != *nodeOnly.Id {
                        continue
                }
-               if !ctx.Check(node.Id) {
+               if *nock {
+                       for job := range ctx.JobsNoCK(node.Id) {
+                               if _, err = ctx.CheckNoCK(node.Id, job.HshValue); err != nil {
+                                       pktName := nncp.Base32Codec.EncodeToString(job.HshValue[:])
+                                       log.Println(filepath.Join(
+                                               ctx.Spool,
+                                               nodeId.String(),
+                                               string(nncp.TRx),
+                                               pktName+nncp.NoCKSuffix,
+                                       ), err)
+                                       isBad = true
+                               }
+                       }
+               } else if !ctx.Check(node.Id) {
                        isBad = true
                }
        }
index 6319debf56957061cbb1f34a41bf0f0db0ae6ab7..e3a73c9e72c162da46a8914a1b6709aed83d41b6 100644 (file)
@@ -70,11 +70,13 @@ func performSP(
        ctx *nncp.Ctx,
        conn nncp.ConnDeadlined,
        nice uint8,
+       noCK bool,
        nodeIdC chan *nncp.NodeId,
 ) {
        state := nncp.SPState{
                Ctx:  ctx,
                Nice: nice,
+               NoCK: noCK,
        }
        if err := state.StartR(conn); err == nil {
                ctx.LogI("call-start", nncp.LEs{{K: "Node", V: state.Node.Id}}, "connected")
@@ -108,6 +110,7 @@ func main() {
                bind      = flag.String("bind", "[::]:5400", "Address to bind to")
                inetd     = flag.Bool("inetd", false, "Is it started as inetd service")
                maxConn   = flag.Int("maxconn", 128, "Maximal number of simultaneous connections")
+               noCK      = flag.Bool("nock", false, "Do no checksum checking")
                spoolPath = flag.String("spool", "", "Override path to spool")
                logPath   = flag.String("log", "", "Override path to logfile")
                quiet     = flag.Bool("quiet", false, "Print only errors")
@@ -160,7 +163,7 @@ func main() {
                os.Stderr.Close() // #nosec G104
                conn := &InetdConn{os.Stdin, os.Stdout}
                nodeIdC := make(chan *nncp.NodeId)
-               go performSP(ctx, conn, nice, nodeIdC)
+               go performSP(ctx, conn, nice, *noCK, nodeIdC)
                nodeId := <-nodeIdC
                var autoTossFinish chan struct{}
                var autoTossBadCode chan bool
@@ -197,7 +200,7 @@ func main() {
                ctx.LogD("daemon", nncp.LEs{{K: "Addr", V: conn.RemoteAddr()}}, "accepted")
                go func(conn net.Conn) {
                        nodeIdC := make(chan *nncp.NodeId)
-                       go performSP(ctx, conn, nice, nodeIdC)
+                       go performSP(ctx, conn, nice, *noCK, nodeIdC)
                        nodeId := <-nodeIdC
                        var autoTossFinish chan struct{}
                        var autoTossBadCode chan bool
index d5609e609514a5ce76d2dea516ab766deb0e6957..b036d7080a0f835e7540e7c0f2a0be652a8364df 100644 (file)
@@ -39,6 +39,8 @@ func usage() {
        fmt.Fprintf(os.Stderr, "       %s [options] -lock\n", os.Args[0])
        fmt.Fprintf(os.Stderr, "       %s [options] -node NODE -part\n", os.Args[0])
        fmt.Fprintf(os.Stderr, "       %s [options] -node NODE -seen\n", os.Args[0])
+       fmt.Fprintf(os.Stderr, "       %s [options] -node NODE -nock\n", os.Args[0])
+       fmt.Fprintf(os.Stderr, "       %s [options] -node NODE -hdr\n", os.Args[0])
        fmt.Fprintf(os.Stderr, "       %s [options] -node NODE {-rx|-tx}\n", os.Args[0])
        fmt.Fprintf(os.Stderr, "       %s [options] -node NODE -pkt PKT\n", os.Args[0])
        fmt.Fprintln(os.Stderr, "-older option's time units are: (s)econds, (m)inutes, (h)ours, (d)ays")
@@ -50,12 +52,14 @@ func main() {
        var (
                cfgPath   = flag.String("cfg", nncp.DefaultCfgPath, "Path to configuration file")
                doTmp     = flag.Bool("tmp", false, "Remove all temporary files")
+               doHdr     = flag.Bool("hdr", false, "Remove all .hdr files")
                doLock    = flag.Bool("lock", false, "Remove all lock files")
                nodeRaw   = flag.String("node", "", "Node to remove files in")
                doRx      = flag.Bool("rx", false, "Process received packets")
                doTx      = flag.Bool("tx", false, "Process transfered packets")
                doPart    = flag.Bool("part", false, "Remove only .part files")
                doSeen    = flag.Bool("seen", false, "Remove only .seen files")
+               doNoCK    = flag.Bool("nock", false, "Remove only .nock files")
                older     = flag.String("older", "", "XXX{smhd}: only older than XXX number of time units")
                dryRun    = flag.Bool("dryrun", false, "Do not actually remove files")
                pktRaw    = flag.String("pkt", "", "Packet to remove")
@@ -176,14 +180,10 @@ func main() {
                                        ctx.LogD("nncp-rm", nncp.LEs{{K: "File", V: path}}, "too fresh, skipping")
                                        return nil
                                }
-                               if *doSeen && strings.HasSuffix(info.Name(), nncp.SeenSuffix) {
-                                       ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "")
-                                       if *dryRun {
-                                               return nil
-                                       }
-                                       return os.Remove(path)
-                               }
-                               if *doPart && strings.HasSuffix(info.Name(), nncp.PartSuffix) {
+                               if (*doSeen && strings.HasSuffix(info.Name(), nncp.SeenSuffix)) ||
+                                       (*doNoCK && strings.HasSuffix(info.Name(), nncp.NoCKSuffix)) ||
+                                       (*doHdr && strings.HasSuffix(info.Name(), nncp.HdrSuffix)) ||
+                                       (*doPart && strings.HasSuffix(info.Name(), nncp.PartSuffix)) {
                                        ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "")
                                        if *dryRun {
                                                return nil
@@ -197,8 +197,7 @@ func main() {
                                        }
                                        return os.Remove(path)
                                }
-                               if !*doSeen &&
-                                       !*doPart &&
+                               if !*doSeen && !*doNoCK && !*doHdr && !*doPart &&
                                        (*doRx || *doTx) &&
                                        ((*doRx && xx == nncp.TRx) || (*doTx && xx == nncp.TTx)) {
                                        ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "")
@@ -210,12 +209,12 @@ func main() {
                                return nil
                        })
        }
-       if *pktRaw != "" || *doRx || *doSeen || *doPart {
+       if *pktRaw != "" || *doRx || *doSeen || *doNoCK || *doHdr || *doPart {
                if err = remove(nncp.TRx); err != nil {
                        log.Fatalln("Can not remove:", err)
                }
        }
-       if *pktRaw != "" || *doTx {
+       if *pktRaw != "" || *doTx || *doHdr {
                if err = remove(nncp.TTx); err != nil {
                        log.Fatalln("Can not remove:", err)
                }
index 04aaed24c0d5eef77a0ea0bfb5665a28c2b48898..bd350c71def19ca60f7feb35ee64178ab4f04144 100644 (file)
@@ -98,18 +98,25 @@ func main() {
                fmt.Println(node.Name)
                rxNums := make(map[uint8]int)
                rxBytes := make(map[uint8]int64)
+               noCKNums := make(map[uint8]int)
+               noCKBytes := make(map[uint8]int64)
                for job := range ctx.Jobs(node.Id, nncp.TRx) {
-                       job.Fd.Close() // #nosec G104
                        if *showPkt {
                                jobPrint(nncp.TRx, job)
                        }
                        rxNums[job.PktEnc.Nice] = rxNums[job.PktEnc.Nice] + 1
                        rxBytes[job.PktEnc.Nice] = rxBytes[job.PktEnc.Nice] + job.Size
                }
+               for job := range ctx.JobsNoCK(node.Id) {
+                       if *showPkt {
+                               jobPrint(nncp.TRx, job)
+                       }
+                       noCKNums[job.PktEnc.Nice] = noCKNums[job.PktEnc.Nice] + 1
+                       noCKBytes[job.PktEnc.Nice] = noCKBytes[job.PktEnc.Nice] + job.Size
+               }
                txNums := make(map[uint8]int)
                txBytes := make(map[uint8]int64)
                for job := range ctx.Jobs(node.Id, nncp.TTx) {
-                       job.Fd.Close() // #nosec G104
                        if *showPkt {
                                jobPrint(nncp.TTx, job)
                        }
@@ -120,17 +127,26 @@ func main() {
                for nice = 1; nice > 0; nice++ {
                        rxNum, rxExists := rxNums[nice]
                        txNum, txExists := txNums[nice]
-                       if !(rxExists || txExists) {
+                       noCKNum, noCKExists := noCKNums[nice]
+                       if !(rxExists || txExists || noCKExists) {
                                continue
                        }
                        fmt.Printf(
-                               "\tnice:% 4s | Rx: % 10s, % 3d pkts | Tx: % 10s, % 3d pkts\n",
+                               "\tnice:% 4s | Rx: % 10s, % 3d pkts | Tx: % 10s, % 3d pkts",
                                nncp.NicenessFmt(nice),
                                humanize.IBytes(uint64(rxBytes[nice])),
                                rxNum,
                                humanize.IBytes(uint64(txBytes[nice])),
                                txNum,
                        )
+                       if noCKExists {
+                               fmt.Printf(
+                                       " | NoCK: % 10s, % 3d pkts",
+                                       humanize.IBytes(uint64(noCKBytes[nice])),
+                                       noCKNum,
+                               )
+                       }
+                       fmt.Printf("\n")
                }
        }
 }
index 06db800fc7588e427d9190be4e4de89af12b2d47..093f1083c1bdf12a9e615136bc53ba0f908d43d7 100644 (file)
@@ -28,7 +28,6 @@ import (
        "os"
        "path/filepath"
 
-       xdr "github.com/davecgh/go-xdr/xdr2"
        "go.cypherpunks.ru/nncp/v5"
 )
 
@@ -183,8 +182,7 @@ func main() {
                                isBad = true
                                continue
                        }
-                       var pktEnc nncp.PktEnc
-                       _, err = xdr.Unmarshal(fd, &pktEnc)
+                       pktEnc, pktEncRaw, err := ctx.HdrRead(fd)
                        if err != nil || pktEnc.Magic != nncp.MagicNNCPEv4 {
                                ctx.LogD("nncp-xfer", les, "is not a packet")
                                fd.Close() // #nosec G104
@@ -249,6 +247,14 @@ func main() {
                                        isBad = true
                                }
                        }
+                       if ctx.HdrUsage {
+                               ctx.HdrWrite(pktEncRaw, filepath.Join(
+                                       ctx.Spool,
+                                       nodeId.String(),
+                                       string(nncp.TRx),
+                                       tmp.Checksum(),
+                               ))
+                       }
                }
        }
 
@@ -315,39 +321,42 @@ Tx:
                }
                les = les[:len(les)-1]
                for job := range ctx.Jobs(&nodeId, nncp.TTx) {
-                       pktName := filepath.Base(job.Fd.Name())
+                       pktName := filepath.Base(job.Path)
                        les := append(les, nncp.LE{K: "Pkt", V: pktName})
                        if job.PktEnc.Nice > nice {
                                ctx.LogD("nncp-xfer", les, "too nice")
-                               job.Fd.Close() // #nosec G104
                                continue
                        }
                        if _, err = os.Stat(filepath.Join(dstPath, pktName)); err == nil || !os.IsNotExist(err) {
                                ctx.LogD("nncp-xfer", les, "already exists")
-                               job.Fd.Close() // #nosec G104
                                continue
                        }
                        if _, err = os.Stat(filepath.Join(dstPath, pktName+nncp.SeenSuffix)); err == nil || !os.IsNotExist(err) {
                                ctx.LogD("nncp-xfer", les, "already exists")
-                               job.Fd.Close() // #nosec G104
                                continue
                        }
                        tmp, err := nncp.TempFile(dstPath, "xfer")
                        if err != nil {
                                ctx.LogE("nncp-xfer", les, err, "mktemp")
-                               job.Fd.Close() // #nosec G104
                                isBad = true
                                break
                        }
                        les = append(les, nncp.LE{K: "Tmp", V: tmp.Name()})
                        ctx.LogD("nncp-xfer", les, "created")
+                       fd, err := os.Open(job.Path)
+                       if err != nil {
+                               ctx.LogE("nncp-xfer", les, err, "open")
+                               tmp.Close() // #nosec G104
+                               isBad = true
+                               continue
+                       }
                        bufW := bufio.NewWriter(tmp)
                        copied, err := nncp.CopyProgressed(
-                               bufW, bufio.NewReader(job.Fd), "Tx",
+                               bufW, bufio.NewReader(fd), "Tx",
                                append(les, nncp.LE{K: "FullSize", V: job.Size}),
                                ctx.ShowPrgrs,
                        )
-                       job.Fd.Close() // #nosec G104
+                       fd.Close() // #nosec G104
                        if err != nil {
                                ctx.LogE("nncp-xfer", les, err, "copy")
                                tmp.Close() // #nosec G104
@@ -383,9 +392,11 @@ Tx:
                        les = les[:len(les)-1]
                        ctx.LogI("nncp-xfer", append(les, nncp.LE{K: "Size", V: copied}), "")
                        if !*keep {
-                               if err = os.Remove(job.Fd.Name()); err != nil {
+                               if err = os.Remove(job.Path); err != nil {
                                        ctx.LogE("nncp-xfer", les, err, "remove")
                                        isBad = true
+                               } else if ctx.HdrUsage {
+                                       os.Remove(job.Path + nncp.HdrSuffix)
                                }
                        }
                }
index ebd31024db7765b0c248300d317dd8afd2ad6a4e..d83882800180f176b6fcf03ee632aba00a427ef8 100644 (file)
@@ -40,6 +40,7 @@ type Ctx struct {
        UmaskForce *int
        Quiet      bool
        ShowPrgrs  bool
+       HdrUsage   bool
        Debug      bool
        NotifyFile *FromToJSON
        NotifyFreq *FromToJSON
index 2742c743a2a912b3f9b1e86b6062d1703dda8d5e..a85ef940965d4252eda61897f7fb844dcde8560f 100644 (file)
@@ -6,14 +6,14 @@ require (
        github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6
        github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75
        github.com/hjson/hjson-go v3.1.0+incompatible
-       github.com/klauspost/compress v1.11.4
+       github.com/klauspost/compress v1.11.7
        github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
        go.cypherpunks.ru/balloon v1.1.1
        go.cypherpunks.ru/recfile v0.4.3
-       golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad
-       golang.org/x/net v0.0.0-20201224014010-6772e930b67b
-       golang.org/x/sys v0.0.0-20210105210732-16f7687f5001
-       golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1
+       golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83
+       golang.org/x/net v0.0.0-20210220033124-5f55cee0dc0d
+       golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43
+       golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d
        gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect
 )
 
index 8ea9c6f983ceabc8554e2e74be23dcda35c010a6..83c3eac7f123449bc29b58b8ae59718f1ac6c802 100644 (file)
@@ -8,8 +8,8 @@ github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 h1:f0n1xnMSmBLzVf
 github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75/go.mod h1:g2644b03hfBX9Ov0ZBDgXXens4rxSxmqFBbhvKv2yVA=
 github.com/hjson/hjson-go v3.1.0+incompatible h1:DY/9yE8ey8Zv22bY+mHV1uk2yRy0h8tKhZ77hEdi0Aw=
 github.com/hjson/hjson-go v3.1.0+incompatible/go.mod h1:qsetwF8NlsTsOTwZTApNlTCerV+b2GjYRRcIk4JMFio=
-github.com/klauspost/compress v1.11.4 h1:kz40R/YWls3iqT9zX9AHN3WoVsrAWVyui5sxuLqiXqU=
-github.com/klauspost/compress v1.11.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
+github.com/klauspost/compress v1.11.7 h1:0hzRabrMN4tSTvMfnL3SCv1ZGeAP23ynzodBgaHeMeg=
+github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
 github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
 github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
 github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
@@ -20,19 +20,21 @@ go.cypherpunks.ru/balloon v1.1.1/go.mod h1:k4s4ozrIrhpBjj78Z7LX8ZHxMQ+XE7DZUWl8g
 go.cypherpunks.ru/recfile v0.4.3 h1:ephokihmV//p0ob6gx2FWXvm28/NBDbWTOJPUNahxO8=
 go.cypherpunks.ru/recfile v0.4.3/go.mod h1:sR+KajB+vzofL3SFVFwKt3Fke0FaCcN1g3YPNAhU3qI=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
-golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad h1:DN0cp81fZ3njFcrLCytUHRSUkqBjfTo4Tx9RJTWs0EY=
-golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
+golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 h1:/ZScEX8SfEmUGRHs0gxpqteO5nfNW6axyZbBdw9A12g=
+golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
 golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
-golang.org/x/net v0.0.0-20201224014010-6772e930b67b h1:iFwSg7t5GZmB/Q5TjiEAsdoLDrdJRC1RiF2WhuV29Qw=
-golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
+golang.org/x/net v0.0.0-20210220033124-5f55cee0dc0d h1:1aflnvSoWWLI2k/dMUAl5lvU1YO4Mb4hz0gh+1rjcxU=
+golang.org/x/net v0.0.0-20210220033124-5f55cee0dc0d/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20210105210732-16f7687f5001 h1:/dSxr6gT0FNI1MO5WLJo8mTmItROeOKTkDn+7OwWBos=
-golang.org/x/sys v0.0.0-20210105210732-16f7687f5001/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43 h1:SgQ6LNaYJU0JIuEHv9+s6EbhSCwYeAf5Yvj6lpYlqAE=
+golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d h1:SZxvLBoTP5yHO3Frd4z4vrF+DBX9vMVanchswa69toE=
+golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
index 9ea5e99a492c30af5146a097685c6fef879a86c3..c0de50ff63f5975ecf289baa781cdf6fa620f2b4 100644 (file)
@@ -47,13 +47,14 @@ func (ctx *Ctx) Humanize(le map[string]string) (string, error) {
        if err == nil {
                nodeS = node.Name
        }
+       var sizeParsed uint64
        var size string
        if sizeRaw, exists := le["Size"]; exists {
-               sp, err := strconv.ParseUint(sizeRaw, 10, 64)
+               sizeParsed, err = strconv.ParseUint(sizeRaw, 10, 64)
                if err != nil {
                        return "", err
                }
-               size = humanize.IBytes(uint64(sp))
+               size = humanize.IBytes(sizeParsed)
        }
 
        var msg string
@@ -203,7 +204,6 @@ func (ctx *Ctx) Humanize(le map[string]string) (string, error) {
                if err, exists := le["Err"]; exists {
                        msg += ": " + err
                }
-
        case "sp-info":
                nice, err := NicenessParse(le["Nice"])
                if err != nil {
@@ -213,15 +213,13 @@ func (ctx *Ctx) Humanize(le map[string]string) (string, error) {
                        "Packet %s (%s) (nice %s)",
                        le["Pkt"], size, NicenessFmt(nice),
                )
-               offsetParsed, err := strconv.ParseUint(le["Offset"], 10, 64)
-               if err != nil {
-                       return "", err
-               }
-               sizeParsed, err := strconv.ParseUint(le["Size"], 10, 64)
-               if err != nil {
-                       return "", err
+               if offset := le["Offset"]; offset != "" {
+                       offsetParsed, err := strconv.ParseUint(offset, 10, 64)
+                       if err != nil {
+                               return "", err
+                       }
+                       msg += fmt.Sprintf(": %d%%", 100*offsetParsed/sizeParsed)
                }
-               msg += fmt.Sprintf(": %d%%", 100*offsetParsed/sizeParsed)
                if m, exists := le["Msg"]; exists {
                        msg += ": " + m
                }
@@ -250,10 +248,6 @@ func (ctx *Ctx) Humanize(le map[string]string) (string, error) {
                if err != nil {
                        return "", err
                }
-               sizeParsed, err := strconv.ParseUint(le["Size"], 10, 64)
-               if err != nil {
-                       return "", err
-               }
                msg += fmt.Sprintf(
                        "%s %d%% (%s / %s)",
                        le["Pkt"],
index 8fe45642d4e8e33998fb89950163973c3ced5f20..37a31b1bcb2d6ec3eee58b17430e361cdd8c1b0d 100644 (file)
@@ -18,9 +18,10 @@ along with this program.  If not, see <http://www.gnu.org/licenses/>.
 package nncp
 
 import (
-       "io"
+       "bytes"
        "os"
        "path/filepath"
+       "strings"
 
        xdr "github.com/davecgh/go-xdr/xdr2"
 )
@@ -30,16 +31,54 @@ type TRxTx string
 const (
        TRx TRxTx = "rx"
        TTx TRxTx = "tx"
+
+       HdrSuffix = ".hdr"
 )
 
 type Job struct {
        PktEnc   *PktEnc
-       Fd       *os.File
+       Path     string
        Size     int64
        HshValue *[32]byte
 }
 
-func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job {
+func (ctx *Ctx) HdrRead(fd *os.File) (*PktEnc, []byte, error) {
+       var pktEnc PktEnc
+       _, err := xdr.Unmarshal(fd, &pktEnc)
+       if err != nil {
+               return nil, nil, err
+       }
+       var raw bytes.Buffer
+       if _, err = xdr.Marshal(&raw, pktEnc); err != nil {
+               panic(err)
+       }
+       return &pktEnc, raw.Bytes(), nil
+}
+
+func (ctx *Ctx) HdrWrite(pktEncRaw []byte, tgt string) error {
+       tmpHdr, err := ctx.NewTmpFile()
+       if err != nil {
+               ctx.LogE("hdr-write", []LE{}, err, "new")
+               return err
+       }
+       if _, err = tmpHdr.Write(pktEncRaw); err != nil {
+               ctx.LogE("hdr-write", []LE{}, err, "write")
+               os.Remove(tmpHdr.Name())
+               return err
+       }
+       if err = tmpHdr.Close(); err != nil {
+               ctx.LogE("hdr-write", []LE{}, err, "close")
+               os.Remove(tmpHdr.Name())
+               return err
+       }
+       if err = os.Rename(tmpHdr.Name(), tgt+HdrSuffix); err != nil {
+               ctx.LogE("hdr-write", []LE{}, err, "rename")
+               return err
+       }
+       return err
+}
+
+func (ctx *Ctx) jobsFind(nodeId *NodeId, xx TRxTx, nock bool) chan Job {
        rxPath := filepath.Join(ctx.Spool, nodeId.String(), string(xx))
        jobs := make(chan Job, 16)
        go func() {
@@ -54,33 +93,58 @@ func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job {
                        return
                }
                for _, fi := range fis {
-                       hshValue, err := Base32Codec.DecodeString(fi.Name())
-                       if err != nil {
-                               continue
+                       name := fi.Name()
+                       var hshValue []byte
+                       if nock {
+                               if !strings.HasSuffix(name, NoCKSuffix) ||
+                                       len(name) != Base32Encoded32Len+len(NoCKSuffix) {
+                                       continue
+                               }
+                               hshValue, err = Base32Codec.DecodeString(
+                                       strings.TrimSuffix(name, NoCKSuffix),
+                               )
+                       } else {
+                               if len(name) != Base32Encoded32Len {
+                                       continue
+                               }
+                               hshValue, err = Base32Codec.DecodeString(name)
                        }
-                       fd, err := os.Open(filepath.Join(rxPath, fi.Name()))
                        if err != nil {
                                continue
                        }
-                       var pktEnc PktEnc
-                       if _, err = xdr.Unmarshal(fd, &pktEnc); err != nil || pktEnc.Magic != MagicNNCPEv4 {
-                               fd.Close() // #nosec G104
+                       pth := filepath.Join(rxPath, name)
+                       hdrExists := true
+                       var fd *os.File
+                       if nock {
+                               fd, err = os.Open(pth)
+                       } else {
+                               fd, err = os.Open(pth + HdrSuffix)
+                               if err != nil && os.IsNotExist(err) {
+                                       hdrExists = false
+                                       fd, err = os.Open(pth)
+                               }
+                       }
+                       if err != nil {
                                continue
                        }
-                       if _, err = fd.Seek(0, io.SeekStart); err != nil {
-                               fd.Close() // #nosec G104
+                       pktEnc, pktEncRaw, err := ctx.HdrRead(fd)
+                       fd.Close()
+                       if err != nil || pktEnc.Magic != MagicNNCPEv4 {
                                continue
                        }
                        ctx.LogD("jobs", LEs{
                                {"XX", string(xx)},
                                {"Node", pktEnc.Sender},
-                               {"Name", fi.Name()},
+                               {"Name", name},
                                {"Nice", int(pktEnc.Nice)},
                                {"Size", fi.Size()},
                        }, "taken")
+                       if !hdrExists && ctx.HdrUsage {
+                               ctx.HdrWrite(pktEncRaw, pth)
+                       }
                        job := Job{
-                               PktEnc:   &pktEnc,
-                               Fd:       fd,
+                               PktEnc:   pktEnc,
+                               Path:     pth,
                                Size:     fi.Size(),
                                HshValue: new([32]byte),
                        }
@@ -90,3 +154,11 @@ func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job {
        }()
        return jobs
 }
+
+func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job {
+       return ctx.jobsFind(nodeId, xx, false)
+}
+
+func (ctx *Ctx) JobsNoCK(nodeId *NodeId) chan Job {
+       return ctx.jobsFind(nodeId, TRx, true)
+}
index 5c98bd7e14916427c236553fea506f39f9668d61..25a494626a9d403e93421b9e36f371fa8076f87d 100644 (file)
@@ -43,6 +43,8 @@ func (les LEs) Rec() string {
                switch v := le.V.(type) {
                case int, int8, uint8, int64, uint64:
                        val = fmt.Sprintf("%d", v)
+               case bool:
+                       val = fmt.Sprintf("%v", v)
                default:
                        val = fmt.Sprintf("%s", v)
                }
index 954dad18932992fa51489635c9bd27ddcf7ac9ee..16e5b9abc49de921e918a3a4051ea5778b4c7f05 100644 (file)
@@ -37,8 +37,10 @@ You should have received a copy of the GNU General Public License
 along with this program.  If not, see <http://www.gnu.org/licenses/>.`
 )
 
+const Base32Encoded32Len = 52
+
 var (
-       Version string = "6.0.0"
+       Version string = "6.1.0"
 
        Base32Codec *base32.Encoding = base32.StdEncoding.WithPadding(base32.NoPadding)
 )
index 4dea1bf3713ab939c0f17fa8c37b267ebd82a9f2..c2715e4928d585f7fb9541af01802f87d530f337 100644 (file)
@@ -192,14 +192,14 @@ func PktEncWrite(
        size, padSize int64,
        data io.Reader,
        out io.Writer,
-) error {
+) ([]byte, error) {
        pubEph, prvEph, err := box.GenerateKey(rand.Reader)
        if err != nil {
-               return err
+               return nil, err
        }
        var pktBuf bytes.Buffer
        if _, err := xdr.Marshal(&pktBuf, pkt); err != nil {
-               return err
+               return nil, err
        }
        tbs := PktTbs{
                Magic:     MagicNNCPEv4,
@@ -210,7 +210,7 @@ func PktEncWrite(
        }
        var tbsBuf bytes.Buffer
        if _, err = xdr.Marshal(&tbsBuf, &tbs); err != nil {
-               return err
+               return nil, err
        }
        signature := new([ed25519.SignatureSize]byte)
        copy(signature[:], ed25519.Sign(our.SignPrv, tbsBuf.Bytes()))
@@ -222,26 +222,31 @@ func PktEncWrite(
                ExchPub:   *pubEph,
                Sign:      *signature,
        }
-       if _, err = xdr.Marshal(out, &pktEnc); err != nil {
-               return err
+       tbsBuf.Reset()
+       if _, err = xdr.Marshal(&tbsBuf, &pktEnc); err != nil {
+               return nil, err
+       }
+       pktEncRaw := tbsBuf.Bytes()
+       if _, err = out.Write(pktEncRaw); err != nil {
+               return nil, err
        }
        sharedKey := new([32]byte)
        curve25519.ScalarMult(sharedKey, prvEph, their.ExchPub)
        kdf, err := blake2b.NewXOF(KDFXOFSize, sharedKey[:])
        if err != nil {
-               return err
+               return nil, err
        }
        if _, err = kdf.Write(MagicNNCPEv4[:]); err != nil {
-               return err
+               return nil, err
        }
 
        key := make([]byte, chacha20poly1305.KeySize)
        if _, err = io.ReadFull(kdf, key); err != nil {
-               return err
+               return nil, err
        }
        aead, err := chacha20poly1305.New(key)
        if err != nil {
-               return err
+               return nil, err
        }
        nonce := make([]byte, aead.NonceSize())
 
@@ -249,31 +254,31 @@ func PktEncWrite(
        sizeBuf := make([]byte, 8+aead.Overhead())
        binary.BigEndian.PutUint64(sizeBuf, uint64(sizeWithTags(int64(fullSize))))
        if _, err = out.Write(aead.Seal(sizeBuf[:0], nonce, sizeBuf[:8], nil)); err != nil {
-               return err
+               return nil, err
        }
 
        lr := io.LimitedReader{R: data, N: size}
        mr := io.MultiReader(&pktBuf, &lr)
        written, err := aeadProcess(aead, nonce, true, mr, out)
        if err != nil {
-               return err
+               return nil, err
        }
        if written != fullSize {
-               return io.ErrUnexpectedEOF
+               return nil, io.ErrUnexpectedEOF
        }
        if padSize > 0 {
                if _, err = io.ReadFull(kdf, key); err != nil {
-                       return err
+                       return nil, err
                }
                kdf, err = blake2b.NewXOF(blake2b.OutputLengthUnknown, key)
                if err != nil {
-                       return err
+                       return nil, err
                }
                if _, err = io.CopyN(out, kdf, padSize); err != nil {
-                       return err
+                       return nil, err
                }
        }
-       return nil
+       return pktEncRaw, nil
 }
 
 func TbsVerify(our *NodeOur, their *Node, pktEnc *PktEnc) (bool, error) {
index cbf781887f7b7039fbe43cdfcc5b87497472de7c..079acbd794622cc0763d5eb258846a47182b96f2 100644 (file)
@@ -44,7 +44,7 @@ func TestPktEncWrite(t *testing.T) {
                if err != nil {
                        panic(err)
                }
-               err = PktEncWrite(
+               _, err = PktEncWrite(
                        nodeOur,
                        nodeTheir.Their(),
                        pkt,
@@ -95,7 +95,7 @@ func TestPktEncRead(t *testing.T) {
                if err != nil {
                        panic(err)
                }
-               err = PktEncWrite(
+               _, err = PktEncWrite(
                        node1,
                        node2.Their(),
                        pkt,
index 62e0f01b6ff77bc44e3bb18a64ebb2d5c17281df..ca1f361a09fbb4e63b26c6b13e72d8013a2d5595 100644 (file)
@@ -141,7 +141,7 @@ func Progress(prefix string, les LEs) {
                progressBarsLock.Unlock()
        }
        what := pkt
-       if len(what) >= 52 { // Base32 encoded
+       if len(what) >= Base32Encoded32Len { // Base32 encoded
                what = what[:16] + ".." + what[len(what)-16:]
        }
        what = prefix + " " + what
index 3c6d2d4bfbfc1f7535f883829e30a7203dd57da8..6f1af9ff7e2ed6815b4740afdf55946f1cce48c9 100644 (file)
--- a/src/sp.go
+++ b/src/sp.go
@@ -21,6 +21,7 @@ import (
        "bytes"
        "crypto/subtle"
        "errors"
+       "hash"
        "io"
        "os"
        "path/filepath"
@@ -30,6 +31,7 @@ import (
 
        xdr "github.com/davecgh/go-xdr/xdr2"
        "github.com/flynn/noise"
+       "golang.org/x/crypto/blake2b"
 )
 
 const (
@@ -38,6 +40,11 @@ const (
        SPHeadOverhead = 4
 )
 
+type SPCheckerQueues struct {
+       appeared chan *[32]byte
+       checked  chan *[32]byte
+}
+
 var (
        MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
 
@@ -56,9 +63,19 @@ var (
        DefaultDeadline = 10 * time.Second
        PingTimeout     = time.Minute
 
-       spCheckerToken chan struct{}
+       spCheckers = make(map[NodeId]*SPCheckerQueues)
 )
 
+type FdAndFullSize struct {
+       fd       *os.File
+       fullSize int64
+}
+
+type HasherAndOffset struct {
+       h      hash.Hash
+       offset uint64
+}
+
 type SPType uint8
 
 const (
@@ -148,8 +165,6 @@ func init() {
                panic(err)
        }
        SPFileOverhead = buf.Len()
-       spCheckerToken = make(chan struct{}, 1)
-       spCheckerToken <- struct{}{}
 }
 
 func MarshalSP(typ SPType, sp interface{}) []byte {
@@ -183,6 +198,7 @@ type SPState struct {
        Ctx            *Ctx
        Node           *Node
        Nice           uint8
+       NoCK           bool
        onlineDeadline time.Duration
        maxOnlineTime  time.Duration
        hs             *noise.HandshakeState
@@ -214,6 +230,9 @@ type SPState struct {
        listOnly       bool
        onlyPkts       map[[32]byte]bool
        writeSPBuf     bytes.Buffer
+       fds            map[string]FdAndFullSize
+       fileHashers    map[string]*HasherAndOffset
+       checkerQueues  SPCheckerQueues
        sync.RWMutex
 }
 
@@ -235,6 +254,11 @@ func (state *SPState) SetDead() {
                for range state.pings {
                }
        }()
+       go func() {
+               for _, s := range state.fds {
+                       s.fd.Close()
+               }
+       }()
 }
 
 func (state *SPState) NotAlive() bool {
@@ -251,6 +275,25 @@ func (state *SPState) dirUnlock() {
        state.Ctx.UnlockDir(state.txLock)
 }
 
+func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) {
+       for hshValue := range appeared {
+               les := LEs{
+                       {"XX", string(TRx)},
+                       {"Node", nodeId},
+                       {"Pkt", Base32Codec.EncodeToString(hshValue[:])},
+               }
+               ctx.LogD("sp-checker", les, "checking")
+               size, err := ctx.CheckNoCK(nodeId, hshValue)
+               les = append(les, LE{"Size", size})
+               if err != nil {
+                       ctx.LogE("sp-checker", les, err, "")
+                       continue
+               }
+               ctx.LogI("sp-done", les, "")
+               go func(hsh *[32]byte) { checked <- hsh }(hshValue)
+       }
+}
+
 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
        state.writeSPBuf.Reset()
        n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
@@ -292,7 +335,6 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [
        var infos []*SPInfo
        var totalSize int64
        for job := range ctx.Jobs(nodeId, TTx) {
-               job.Fd.Close() // #nosec G104
                if job.PktEnc.Nice > nice {
                        continue
                }
@@ -445,6 +487,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error {
        state.infosTheir = make(map[[32]byte]*SPInfo)
        state.started = started
        state.xxOnly = xxOnly
+
        var buf []byte
        var payload []byte
        state.Ctx.LogD("sp-start", LEs{{"Nice", int(state.Nice)}}, "waiting for first message")
@@ -530,17 +573,60 @@ func (state *SPState) StartR(conn ConnDeadlined) error {
        return err
 }
 
+func (state *SPState) closeFd(pth string) {
+       s, exists := state.fds[pth]
+       delete(state.fds, pth)
+       if exists {
+               s.fd.Close()
+       }
+}
+
 func (state *SPState) StartWorkers(
        conn ConnDeadlined,
        infosPayloads [][]byte,
        payload []byte,
 ) error {
        les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
+       state.fds = make(map[string]FdAndFullSize)
+       state.fileHashers = make(map[string]*HasherAndOffset)
        state.isDead = make(chan struct{})
        if state.maxOnlineTime > 0 {
                state.mustFinishAt = state.started.Add(state.maxOnlineTime)
        }
 
+       // Checker
+       if !state.NoCK {
+               queues := spCheckers[*state.Node.Id]
+               if queues == nil {
+                       queues = &SPCheckerQueues{
+                               appeared: make(chan *[32]byte),
+                               checked:  make(chan *[32]byte),
+                       }
+                       spCheckers[*state.Node.Id] = queues
+                       go SPChecker(state.Ctx, state.Node.Id, queues.appeared, queues.checked)
+               }
+               state.checkerQueues = *queues
+               go func() {
+                       for job := range state.Ctx.JobsNoCK(state.Node.Id) {
+                               if job.PktEnc.Nice <= state.Nice {
+                                       state.checkerQueues.appeared <- job.HshValue
+                               }
+                       }
+               }()
+               state.wg.Add(1)
+               go func() {
+                       defer state.wg.Done()
+                       for {
+                               select {
+                               case <-state.isDead:
+                                       return
+                               case hsh := <-state.checkerQueues.checked:
+                                       state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh})
+                               }
+                       }
+               }()
+       }
+
        // Remaining handshake payload sending
        if len(infosPayloads) > 1 {
                state.wg.Add(1)
@@ -683,22 +769,29 @@ func (state *SPState) StartWorkers(
                                        {"Size", int64(freq.Offset)},
                                }...)
                                state.Ctx.LogD("sp-file", lesp, "queueing")
-                               fd, err := os.Open(filepath.Join(
+                               pth := filepath.Join(
                                        state.Ctx.Spool,
                                        state.Node.Id.String(),
                                        string(TTx),
                                        Base32Codec.EncodeToString(freq.Hash[:]),
-                               ))
-                               if err != nil {
-                                       state.Ctx.LogE("sp-file", lesp, err, "")
-                                       return
-                               }
-                               fi, err := fd.Stat()
-                               if err != nil {
-                                       state.Ctx.LogE("sp-file", lesp, err, "")
-                                       return
+                               )
+                               fdAndFullSize, exists := state.fds[pth]
+                               if !exists {
+                                       fd, err := os.Open(pth)
+                                       if err != nil {
+                                               state.Ctx.LogE("sp-file", lesp, err, "")
+                                               return
+                                       }
+                                       fi, err := fd.Stat()
+                                       if err != nil {
+                                               state.Ctx.LogE("sp-file", lesp, err, "")
+                                               return
+                                       }
+                                       fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
+                                       state.fds[pth] = fdAndFullSize
                                }
-                               fullSize := fi.Size()
+                               fd := fdAndFullSize.fd
+                               fullSize := fdAndFullSize.fullSize
                                var buf []byte
                                if freq.Offset < uint64(fullSize) {
                                        state.Ctx.LogD("sp-file", lesp, "seeking")
@@ -715,7 +808,7 @@ func (state *SPState) StartWorkers(
                                        buf = buf[:n]
                                        state.Ctx.LogD("sp-file", append(lesp, LE{"Size", n}), "read")
                                }
-                               fd.Close() // #nosec G104
+                               state.closeFd(pth)
                                payload = MarshalSP(SPTypeFile, SPFile{
                                        Hash:    freq.Hash,
                                        Offset:  freq.Offset,
@@ -862,8 +955,10 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        state.Lock()
                        state.queueTheir = nil
                        state.Unlock()
+
                case SPTypePing:
                        state.Ctx.LogD("sp-process", append(les, LE{"Type", "ping"}), "")
+
                case SPTypeInfo:
                        infosGot = true
                        lesp := append(les, LE{"Type", "info"})
@@ -910,6 +1005,10 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                }
                                continue
                        }
+                       if _, err = os.Stat(pktPath + NoCKSuffix); err == nil {
+                               state.Ctx.LogI("sp-info", lesp, "still non checksummed")
+                               continue
+                       }
                        fi, err := os.Stat(pktPath + PartSuffix)
                        var offset int64
                        if err == nil {
@@ -926,6 +1025,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                        SPFreq{info.Hash, uint64(offset)},
                                ))
                        }
+
                case SPTypeFile:
                        lesp := append(les, LE{"Type", "file"})
                        state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
@@ -945,29 +1045,60 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                string(TRx),
                        )
                        filePath := filepath.Join(dirToSync, Base32Codec.EncodeToString(file.Hash[:]))
+                       filePathPart := filePath + PartSuffix
                        state.Ctx.LogD("sp-file", lesp, "opening part")
-                       fd, err := os.OpenFile(
-                               filePath+PartSuffix,
-                               os.O_RDWR|os.O_CREATE,
-                               os.FileMode(0666),
-                       )
-                       if err != nil {
-                               state.Ctx.LogE("sp-file", lesp, err, "")
-                               return nil, err
+                       fdAndFullSize, exists := state.fds[filePathPart]
+                       var fd *os.File
+                       if exists {
+                               fd = fdAndFullSize.fd
+                       } else {
+                               fd, err = os.OpenFile(
+                                       filePathPart,
+                                       os.O_RDWR|os.O_CREATE,
+                                       os.FileMode(0666),
+                               )
+                               if err != nil {
+                                       state.Ctx.LogE("sp-file", lesp, err, "")
+                                       return nil, err
+                               }
+                               state.fds[filePathPart] = FdAndFullSize{fd: fd}
+                               if file.Offset == 0 {
+                                       h, err := blake2b.New256(nil)
+                                       if err != nil {
+                                               panic(err)
+                                       }
+                                       state.fileHashers[filePath] = &HasherAndOffset{h: h}
+                               }
                        }
                        state.Ctx.LogD("sp-file", append(lesp, LE{"Offset", file.Offset}), "seeking")
                        if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
                                state.Ctx.LogE("sp-file", lesp, err, "")
-                               fd.Close() // #nosec G104
+                               state.closeFd(filePathPart)
                                return nil, err
                        }
                        state.Ctx.LogD("sp-file", lesp, "writing")
-                       _, err = fd.Write(file.Payload)
-                       if err != nil {
+                       if _, err = fd.Write(file.Payload); err != nil {
                                state.Ctx.LogE("sp-file", lesp, err, "")
-                               fd.Close() // #nosec G104
+                               state.closeFd(filePathPart)
                                return nil, err
                        }
+                       hasherAndOffset, hasherExists := state.fileHashers[filePath]
+                       if hasherExists {
+                               if hasherAndOffset.offset == file.Offset {
+                                       if _, err = hasherAndOffset.h.Write(file.Payload); err != nil {
+                                               panic(err)
+                                       }
+                                       hasherAndOffset.offset += uint64(len(file.Payload))
+                               } else {
+                                       state.Ctx.LogE(
+                                               "sp-file", lesp,
+                                               errors.New("offset differs"),
+                                               "deleting hasher",
+                                       )
+                                       delete(state.fileHashers, filePath)
+                                       hasherExists = false
+                               }
+                       }
                        ourSize := int64(file.Offset + uint64(len(file.Payload)))
                        lesp[len(lesp)-1].V = ourSize
                        fullsize := int64(0)
@@ -982,51 +1113,71 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                Progress("Rx", lesp)
                        }
                        if fullsize != ourSize {
-                               fd.Close() // #nosec G104
                                continue
                        }
-                       <-spCheckerToken
-                       go func() {
-                               defer func() {
-                                       spCheckerToken <- struct{}{}
-                               }()
-                               if err := fd.Sync(); err != nil {
-                                       state.Ctx.LogE("sp-file", lesp, err, "sync")
-                                       fd.Close() // #nosec G104
-                                       return
-                               }
-                               state.wg.Add(1)
-                               defer state.wg.Done()
-                               if _, err = fd.Seek(0, io.SeekStart); err != nil {
-                                       fd.Close() // #nosec G104
-                                       state.Ctx.LogE("sp-file", lesp, err, "")
-                                       return
-                               }
-                               state.Ctx.LogD("sp-file", lesp, "checking")
-                               gut, err := Check(fd, file.Hash[:], lesp, state.Ctx.ShowPrgrs)
-                               fd.Close() // #nosec G104
-                               if err != nil || !gut {
+                       err = fd.Sync()
+                       if err != nil {
+                               state.Ctx.LogE("sp-file", lesp, err, "sync")
+                               state.closeFd(filePathPart)
+                               continue
+                       }
+                       if hasherExists {
+                               if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 {
                                        state.Ctx.LogE("sp-file", lesp, errors.New("checksum mismatch"), "")
-                                       return
+                                       continue
                                }
-                               state.Ctx.LogI("sp-done", lesp, "")
-                               if err = os.Rename(filePath+PartSuffix, filePath); err != nil {
+                               if err = os.Rename(filePathPart, filePath); err != nil {
                                        state.Ctx.LogE("sp-file", lesp, err, "rename")
-                                       return
+                                       continue
                                }
                                if err = DirSync(dirToSync); err != nil {
                                        state.Ctx.LogE("sp-file", lesp, err, "sync")
-                                       return
+                                       continue
                                }
-                               state.Lock()
-                               delete(state.infosTheir, *file.Hash)
-                               state.Unlock()
+                               state.Ctx.LogI("sp-file", lesp, "done")
                                state.wg.Add(1)
                                go func() {
                                        state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
                                        state.wg.Done()
                                }()
-                       }()
+                               state.Lock()
+                               delete(state.infosTheir, *file.Hash)
+                               state.Unlock()
+                               if !state.Ctx.HdrUsage {
+                                       state.closeFd(filePathPart)
+                                       continue
+                               }
+                               if _, err = fd.Seek(0, io.SeekStart); err != nil {
+                                       state.Ctx.LogE("sp-file", lesp, err, "seek")
+                                       state.closeFd(filePathPart)
+                                       continue
+                               }
+                               _, pktEncRaw, err := state.Ctx.HdrRead(fd)
+                               state.closeFd(filePathPart)
+                               if err != nil {
+                                       state.Ctx.LogE("sp-file", lesp, err, "HdrRead")
+                                       continue
+                               }
+                               state.Ctx.HdrWrite(pktEncRaw, filePath)
+                               continue
+                       }
+                       state.closeFd(filePathPart)
+                       if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
+                               state.Ctx.LogE("sp-file", lesp, err, "rename")
+                               continue
+                       }
+                       if err = DirSync(dirToSync); err != nil {
+                               state.Ctx.LogE("sp-file", lesp, err, "sync")
+                               continue
+                       }
+                       state.Ctx.LogI("sp-file", lesp, "downloaded")
+                       state.Lock()
+                       delete(state.infosTheir, *file.Hash)
+                       state.Unlock()
+                       if !state.NoCK {
+                               state.checkerQueues.appeared <- file.Hash
+                       }
+
                case SPTypeDone:
                        lesp := append(les, LE{"Type", "done"})
                        state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
@@ -1036,19 +1187,23 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                return nil, err
                        }
                        lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(done.Hash[:])})
+                       lesp = append(lesp, LE{"XX", string(TTx)})
                        state.Ctx.LogD("sp-done", lesp, "removing")
-                       err := os.Remove(filepath.Join(
+                       pth := filepath.Join(
                                state.Ctx.Spool,
                                state.Node.Id.String(),
                                string(TTx),
                                Base32Codec.EncodeToString(done.Hash[:]),
-                       ))
-                       lesp = append(lesp, LE{"XX", string(TTx)})
-                       if err == nil {
+                       )
+                       if err = os.Remove(pth); err == nil {
                                state.Ctx.LogI("sp-done", lesp, "")
+                               if state.Ctx.HdrUsage {
+                                       os.Remove(pth + HdrSuffix)
+                               }
                        } else {
                                state.Ctx.LogE("sp-done", lesp, err, "")
                        }
+
                case SPTypeFreq:
                        lesp := append(les, LE{"Type", "freq"})
                        state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
@@ -1081,6 +1236,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        } else {
                                state.Ctx.LogD("sp-process", lesp, "unknown")
                        }
+
                default:
                        state.Ctx.LogE(
                                "sp-process",
index e1e912745c0dab1b4e26778633a9f370403ab59a..b99ef3d49a9a4d36bc6d1b39c310d50bf9d01722 100644 (file)
@@ -92,6 +92,10 @@ func DirSync(dirPath string) error {
        return fd.Close()
 }
 
+func (tmp *TmpFileWHash) Checksum() string {
+       return Base32Codec.EncodeToString(tmp.Hsh.Sum(nil))
+}
+
 func (tmp *TmpFileWHash) Commit(dir string) error {
        var err error
        if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
@@ -108,7 +112,7 @@ func (tmp *TmpFileWHash) Commit(dir string) error {
        if err = tmp.Fd.Close(); err != nil {
                return err
        }
-       checksum := Base32Codec.EncodeToString(tmp.Hsh.Sum(nil))
+       checksum := tmp.Checksum()
        tmp.ctx.LogD("tmp", LEs{{"Src", tmp.Fd.Name()}, {"Dst", checksum}}, "commit")
        if err = os.Rename(tmp.Fd.Name(), filepath.Join(dir, checksum)); err != nil {
                return err
index cc49784b335ffb7bfdecf1dd395b17fac68c5ca1..576abb1fa3553346ee88d9705e970c47de116080 100644 (file)
@@ -83,23 +83,24 @@ func (ctx *Ctx) Toss(
        }
        defer decompressor.Close()
        for job := range ctx.Jobs(nodeId, TRx) {
-               pktName := filepath.Base(job.Fd.Name())
+               pktName := filepath.Base(job.Path)
                les := LEs{{"Node", job.PktEnc.Sender}, {"Pkt", pktName}}
                if job.PktEnc.Nice > nice {
                        ctx.LogD("rx", append(les, LE{"Nice", int(job.PktEnc.Nice)}), "too nice")
-                       job.Fd.Close() // #nosec G104
                        continue
                }
+               fd, err := os.Open(job.Path)
+               if err != nil {
+                       ctx.LogE("rx", les, err, "open")
+                       isBad = true
+                       continue
+               }
+
                pipeR, pipeW := io.Pipe()
                go func(job Job) error {
                        pipeWB := bufio.NewWriter(pipeW)
-                       _, _, err := PktEncRead(
-                               ctx.Self,
-                               ctx.Neigh,
-                               bufio.NewReader(job.Fd),
-                               pipeWB,
-                       )
-                       job.Fd.Close() // #nosec G104
+                       _, _, err := PktEncRead(ctx.Self, ctx.Neigh, bufio.NewReader(fd), pipeWB)
+                       fd.Close() // #nosec G104
                        if err != nil {
                                return pipeW.CloseWithError(err)
                        }
@@ -109,7 +110,6 @@ func (ctx *Ctx) Toss(
                        return pipeW.Close()
                }(job)
                var pkt Pkt
-               var err error
                var pktSize int64
                var pktSizeBlocks int64
                if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
@@ -197,13 +197,15 @@ func (ctx *Ctx) Toss(
                        ctx.LogI("rx", les, "")
                        if !dryRun {
                                if doSeen {
-                                       if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
+                                       if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
                                                fd.Close() // #nosec G104
                                        }
                                }
-                               if err = os.Remove(job.Fd.Name()); err != nil {
+                               if err = os.Remove(job.Path); err != nil {
                                        ctx.LogE("rx", les, err, "remove")
                                        isBad = true
+                               } else if ctx.HdrUsage {
+                                       os.Remove(job.Path + HdrSuffix)
                                }
                        }
                case PktTypeFile:
@@ -293,13 +295,15 @@ func (ctx *Ctx) Toss(
                        ctx.LogI("rx", les, "")
                        if !dryRun {
                                if doSeen {
-                                       if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
+                                       if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
                                                fd.Close() // #nosec G104
                                        }
                                }
-                               if err = os.Remove(job.Fd.Name()); err != nil {
+                               if err = os.Remove(job.Path); err != nil {
                                        ctx.LogE("rx", les, err, "remove")
                                        isBad = true
+                               } else if ctx.HdrUsage {
+                                       os.Remove(job.Path + HdrSuffix)
                                }
                                if len(sendmail) > 0 && ctx.NotifyFile != nil {
                                        cmd := exec.Command(
@@ -362,13 +366,15 @@ func (ctx *Ctx) Toss(
                        ctx.LogI("rx", les, "")
                        if !dryRun {
                                if doSeen {
-                                       if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
+                                       if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
                                                fd.Close() // #nosec G104
                                        }
                                }
-                               if err = os.Remove(job.Fd.Name()); err != nil {
+                               if err = os.Remove(job.Path); err != nil {
                                        ctx.LogE("rx", les, err, "remove")
                                        isBad = true
+                               } else if ctx.HdrUsage {
+                                       os.Remove(job.Path + HdrSuffix)
                                }
                                if len(sendmail) > 0 && ctx.NotifyFreq != nil {
                                        cmd := exec.Command(
@@ -408,13 +414,15 @@ func (ctx *Ctx) Toss(
                        ctx.LogI("rx", les, "")
                        if !dryRun {
                                if doSeen {
-                                       if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
+                                       if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
                                                fd.Close() // #nosec G104
                                        }
                                }
-                               if err = os.Remove(job.Fd.Name()); err != nil {
+                               if err = os.Remove(job.Path); err != nil {
                                        ctx.LogE("rx", les, err, "remove")
                                        isBad = true
+                               } else if ctx.HdrUsage {
+                                       os.Remove(job.Path + HdrSuffix)
                                }
                        }
                default:
index 8a16dd94512f184c87e60aef65e88f30c0f629be..807c49a9ea55c8222a0fa9c7c3ada15faf03bbd1 100644 (file)
@@ -383,7 +383,12 @@ func TestTossFreq(t *testing.T) {
                }
                for job := range ctx.Jobs(ctx.Self.Id, TTx) {
                        var buf bytes.Buffer
-                       _, _, err := PktEncRead(ctx.Self, ctx.Neigh, job.Fd, &buf)
+                       fd, err := os.Open(job.Path)
+                       if err != nil {
+                               t.Error(err)
+                               return false
+                       }
+                       _, _, err = PktEncRead(ctx.Self, ctx.Neigh, fd, &buf)
                        if err != nil {
                                t.Error(err)
                                return false
@@ -454,7 +459,7 @@ func TestTossTrns(t *testing.T) {
                        }
                        copy(pktTrans.Path[:], nodeOur.Id[:])
                        var dst bytes.Buffer
-                       if err := PktEncWrite(
+                       if _, err := PktEncWrite(
                                ctx.Self,
                                ctx.Neigh[*nodeOur.Id],
                                &pktTrans,
index c89957b333884a245793d49c6c72001b51b603aa..3e57f9b865b1a56d99a0623edd602942f17a1dcf 100644 (file)
--- a/src/tx.go
+++ b/src/tx.go
@@ -62,7 +62,9 @@ func (ctx *Ctx) Tx(
        }
        expectedSize := size
        for i := 0; i < len(hops); i++ {
-               expectedSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+expectedSize)
+               expectedSize = PktEncOverhead +
+                       PktSizeOverhead +
+                       sizeWithTags(PktOverhead+expectedSize)
        }
        padSize := minSize - expectedSize
        if padSize < 0 {
@@ -79,16 +81,23 @@ func (ctx *Ctx) Tx(
        errs := make(chan error)
        curSize := size
        pipeR, pipeW := io.Pipe()
+       var pktEncRaw []byte
        go func(size int64, src io.Reader, dst io.WriteCloser) {
                ctx.LogD("tx", LEs{
                        {"Node", hops[0].Id},
                        {"Nice", int(nice)},
                        {"Size", size},
                }, "wrote")
-               errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
+               pktEncRaw, err = PktEncWrite(
+                       ctx.Self, hops[0], pkt, nice, size, padSize, src, dst,
+               )
+               errs <- err
                dst.Close() // #nosec G104
        }(curSize, src, pipeW)
-       curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + padSize
+       curSize = PktEncOverhead +
+               PktSizeOverhead +
+               sizeWithTags(PktOverhead+curSize) +
+               padSize
 
        var pipeRPrev io.Reader
        for i := 1; i < len(hops); i++ {
@@ -101,7 +110,8 @@ func (ctx *Ctx) Tx(
                                {"Nice", int(nice)},
                                {"Size", size},
                        }, "trns wrote")
-                       errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
+                       _, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
+                       errs <- err
                        dst.Close() // #nosec G104
                }(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
                curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
@@ -124,6 +134,12 @@ func (ctx *Ctx) Tx(
        nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
        err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
        os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) // #nosec G104
+       if err != nil {
+               return lastNode, err
+       }
+       if ctx.HdrUsage {
+               ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum()))
+       }
        return lastNode, err
 }
 
index ab0c1c6c8f3a6c6f49ee24772a40aee3dcabba10..7e07a0524a1d0a5837e7ce9033c2ea229d18e4e4 100644 (file)
@@ -98,9 +98,13 @@ func TestTx(t *testing.T) {
                        return false
                }
                txJob := sentJobs[0]
-               defer txJob.Fd.Close()
+               fd, err := os.Open(txJob.Path)
+               if err != nil {
+                       panic(err)
+               }
+               defer fd.Close()
                var bufR bytes.Buffer
-               if _, err = io.Copy(&bufR, txJob.Fd); err != nil {
+               if _, err = io.Copy(&bufR, fd); err != nil {
                        panic(err)
                }
                var bufW bytes.Buffer