Please send questions regarding the use of NNCP, bug reports and
patches to nncp-devel mailing list:
-https://lists.cypherpunks.ru/pipermail/nncp-devel/
+http://lists.cypherpunks.ru/nncp_002ddevel.html
Development Git source code repository currently is located here:
http://www.git.cypherpunks.ru/?p=nncp.git;a=summary
Пожалуйста все вопросы касающиеся использования NNCP, отчёты об ошибках
и патчи отправляйте в nncp-devel почтовую рассылку:
-https://lists.cypherpunks.ru/pipermail/nncp-devel/
+http://lists.cypherpunks.ru/nncp_002ddevel.html
Исходный код для разработчика находится в Git репозитории:
http://www.git.cypherpunks.ru/?p=nncp.git;a=summary
Example @url{https://www.newsyslog.org/manual.html, newsyslog}'s entry:
@example
-/var/spool/nncp/log 644 7 100 * CYN
+/var/spool/nncp/log 644 7 100 * BCYN
@end example
@item
{
cron: "*/5 * * * * * *"
when-tx-exists: true
+ nock: true
},
]
@end verbatim
@item when-tx-exists
Call only if packets for sending exists.
+@anchor{CfgNoCK}
+@item nock
+NoCK (no-checksumming) tells not to do checksumming of received files,
+assuming that it will be done for example with @ref{nncp-check} command
+later. That can help minimizing time spent online, because HDD won't do
+simultaneous reading of the data for checksumming and writing of the
+received one, but just sequential writing of the file. Pay attention
+that you have to make a call to remote node after checksumming is done,
+to send notification about successful packet reception.
+
@end table
log: /var/spool/nncp/log
umask: "022"
noprogress: true
+ nohdr: true
notify: {
file: {
commands by default. You can always force its showing with
@option{-progress} command line option anyway.
+@anchor{CfgNoHdr}
+@strong{nohdr} option disables @ref{HdrFile, .hdr} files usage.
+
@anchor{CfgNotify}
@strong{notify} section contains notification settings for successfully
tossed file, freq and exec packets. Corresponding @strong{from} and
[-rxrate INT]
[-txrate INT]
[-autotoss*]
+ [-nock]
NODE[:ADDR] [FORCEADDR]
@end example
If @option{-rx} option is specified then only inbound packets
transmission is performed. If @option{-tx} option is specified, then
-only outbound transmission is performed. @option{-onlinedeadline}
-overrides @ref{CfgOnlineDeadline, @emph{onlinedeadline}}.
-@option{-maxonlinetime} overrides @ref{CfgMaxOnlineTime,
-@emph{maxonlinetime}}. @option{-rxrate}/@option{-txrate} override
-@ref{CfgXxRate, rxrate/txrate}. @option{-list} option allows you to list
-packets of remote node, without any transmission.
+only outbound transmission is performed.
-You can specify what packets your want to download, by specifying
-@option{-pkts} option with comma-separated list of packets identifiers.
+@option{-onlinedeadline} overrides @ref{CfgOnlineDeadline, @emph{onlinedeadline}}.
+@option{-maxonlinetime} overrides @ref{CfgMaxOnlineTime, @emph{maxonlinetime}}.
+@option{-rxrate}/@option{-txrate} override @ref{CfgXxRate, rxrate/txrate}.
+Read @ref{CfgNoCK, more} about @option{-nock} option.
+
+@option{-list} option allows you to list packets of remote node, without
+any transmission. You can specify what packets your want to download, by
+specifying @option{-pkts} option with comma-separated list of packets
+identifiers.
Each @option{NODE} can contain several uniquely identified
@option{ADDR}esses in @ref{CfgAddrs, configuration} file. If you do
@section nncp-check
@example
-$ nncp-check [options]
+$ nncp-check [-nock] [options]
@end example
Perform @ref{Spool, spool} directory integrity check. Read all files
that has Base32-encoded filenames and compare it with recalculated
-BLAKE2b hash output of their contents. That supplementary command is
-not used often in practice, if ever.
+BLAKE2b hash output of their contents.
+
+The most useful mode of operation is with @option{-nock} option, that
+checks integrity of @file{.nock} files, renaming them to ordinary
+(verified) encrypted packets.
@node nncp-cronexpr
@section nncp-cronexpr
@section nncp-daemon
@example
-$ nncp-daemon [options] [-maxconn INT] [-bind ADDR] [-inetd] [-autotoss*]
+$ nncp-daemon [options]
+ [-maxconn INT] [-bind ADDR] [-inetd]
+ [-autotoss*] [-nock]
@end example
Start listening TCP daemon, wait for incoming connections and run
during the call. All @option{-autotoss-*} options is the same as in
@ref{nncp-toss} command.
+Read @ref{CfgNoCK, more} about @option{-nock} option.
+
@node nncp-exec
@section nncp-exec
$ nncp-rm [options] -lock
$ nncp-rm [options] -node NODE -part
$ nncp-rm [options] -node NODE -seen
+$ nncp-rm [options] -node NODE -nock
$ nncp-rm [options] -node NODE [-rx] [-tx]
$ nncp-rm [options] -node NODE -pkt PKT
@end example
failing to be processed.
@item When either @option{-rx} or @option{-tx} options are specified
-(maybe both of them), then delete all packets from that given queues. If
-@option{-part} is given, then delete only @file{.part}ly downloaded
-ones. If @option{-seen} option is specified, then delete only
-@file{.seen} files.
+(maybe both of them), then delete all packets from that given queues.
+@option{-part} option deletes @file{.part}ly downloaded files.
+@option{-seen} option deletes @file{.seen} files. @option{-nock} option
+deletes non-checksummed (non-verified) @file{.nock} files.
@item @option{-dryrun} option just prints what will be deleted.
@unnumbered Contacts
Please send questions regarding the use of NNCP, bug reports and patches to
-@url{https://lists.cypherpunks.ru/pipermail/nncp-devel/, nncp-devel}
+@url{http://lists.cypherpunks.ru/nncp_002ddevel.html, nncp-devel}
mailing list. Announcements also go to this mailing list.
Official website is @url{http://www.nncpgo.org/}.
@multitable {XXXXX} {XXXX-XX-XX} {XXXX KiB} {link sign} {xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx}
@headitem Version @tab Date @tab Size @tab Tarball @tab SHA256 checksum
+@item @ref{Release 6.0.0, 6.0.0} @tab 2021-01-23 @tab 1028 KiB
+@tab @url{download/nncp-6.0.0.tar.xz, link} @url{download/nncp-6.0.0.tar.xz.sig, sign}
+@tab @code{42FE8AA5 4520B3A1 ABB50D66 1BBBA6A1 41CE4E74 9B4816B0 D4C6845D 67465916}
+
@item @ref{Release 5.6.0, 5.6.0} @tab 2021-01-17 @tab 1024 KiB
@tab @url{download/nncp-5.6.0.tar.xz, link} @url{download/nncp-5.6.0.tar.xz.sig, sign}
@tab @code{1DC83F05 F14A3C3B 95820046 C60B170E B8C8936F 142A5B9A 1E943E6F 4CEFBDE3}
@example
/usr/local/etc/postfix/master.cf:
nncp unix - n n - - pipe
- flags=F user=nncp argv=nncp-exec -quiet $nexthop sendmail $recipient
+ flags=FRqhu user=nncp argv=nncp-exec -quiet $nexthop sendmail $recipient
@end example
This runs the @command{nncp-exec} command to place outgoing mail into
-the NNCP queue after replacing @var{$nexthop} by the the receiving NNCP
+the NNCP queue after replacing @var{$nexthop} by the receiving NNCP
node and after replacing @var{$recipient} by the recipients. The
@command{pipe(8)} delivery agent executes the @command{nncp-exec}
command without assistance from the shell, so there are no problems with
shell meta characters in command-line parameters.
+Pay attention to @code{flags}, containing @code{R}, telling Postfix to
+include @code{Return-Path:} header. Otherwise that envelope sender
+information will be lost. Possibly you will also need somehow to
+preserve that header on the receiving side, because @command{sendmail}
+command will replace it. For example you can rename it before feeding to
+@command{sendmail} with
+@code{reformail -R Return-Path: X-Original-Return-Path: | sendmail}, or
+extract with:
+
+@verbatiminclude sendmail.sh
+
@item Specify that mail for @emph{example.com}, should be delivered via
NNCP, to a host named @emph{nncp-host}:
@example
/usr/local/etc/postfix/master.cf:
nncp unix - n n - - pipe
- flags=F user=nncp argv=nncp-exec -quiet $nexthop sendmail $recipient
+ flags=Fqhu user=nncp argv=nncp-exec -quiet $nexthop sendmail $recipient
@end example
This runs the @command{nncp-exec} command to place outgoing mail into
@node Новости
@section Новости
+@node Релиз 6.1.0
+@subsection Релиз 6.1.0
+@itemize
+
+@item
+Оптимизация: большинство команд теперь не держат открытыми файловые
+дескрипторы. Прежде вы легко могли выйти за пределы максимально
+допустимого количества открытых файлов, если у вас было много пакетов в
+spool директории.
+
+@item
+Оптимизация: не закрывать файловый дескриптор файла который мы качаем.
+Прежде каждый его кусочек приводил к дорогим open/close вызовам.
+
+@item
+Скачиваемые в режиме online файлы теперь сохраняются с @file{.nock}
+суффиксом (non-checksummed), ожидая пока либо @command{nncp-check}, либо
+online демоны не выполнят проверку целостности.
+
+@item
+Оптимизация: для файлов, скачивание которых не было продолжено, сразу же
+вычисляет контрольная сумма, пропуская промежуточный @file{.nock} шаг.
+
+@item
+Возможность хранения заголовков зашифрованных пакетов в @file{.hdr}
+файлах, рядом с самими пакетами. Это может существенно повысить скорость
+получения списка пакетов на файловых системах с большим размером блока.
+
+@end itemize
+
@node Релиз 6.0.0
@subsection Релиз 6.0.0
@itemize
See also this page @ref{Новости, on russian}.
+@node Release 6.1.0
+@section Release 6.1.0
+@itemize
+
+@item
+Optimization: most commands do not keep opened file descriptors now.
+Previously you can exceed maximal number of opened files if you have got
+many packets in the spool directory.
+
+@item
+Optimization: do not close file descriptor of the file we download
+online. Previously each chunk lead to expensive open/close calls.
+
+@item
+Online downloaded files are saved with @file{.nock} (non-checksummed)
+suffix, waiting either for @command{nncp-check}, or online daemons to
+perform integrity check.
+
+@item
+Optimization: files, that are not resumed, are checksummed immediately
+during the online download, skipping @file{.nock}-intermediate step.
+
+@item
+Ability to store encrypted packet's header in @file{.hdr} file, close to
+the packet itself. That can greatly increase performance of packets
+listing on filesystems with big block's size.
+
+@end itemize
+
@node Release 6.0.0
@section Release 6.0.0
@itemize
-@item Log uses human readable and easy machine parseable
+@item
+Log uses human readable and easy machine parseable
@url{https://www.gnu.org/software/recutils/, recfile} format for the
records, instead of structured RFC 3339 lines. Old logs are not readable
by @command{nncp-log} anymore.
--- /dev/null
+#!/bin/sh -e
+
+tmp=`mktemp`
+trap "rm -f $tmp" HUP PIPE INT QUIT TERM EXIT
+cat > $tmp
+sendmail -f "`reformail -x Return-Path: < $tmp`" $@ < $tmp
Spool directory holds @ref{Encrypted, encrypted packets} received from
remote nodes and queued for sending to them. It has the following
-example structure:
+example structure with just single outbound (@code{tx}) packet
+@code{LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ} to the node
+@code{2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ}:
@example
-spool/tmp/
-spool/2WHB...OABQ/rx.lock
-spool/2WHB...OABQ/rx/5ZIB...UMKW.part
-spool/2WHB...OABQ/tx.lock
-spool/2WHB...OABQ/toss.lock
-spool/BYRR...CG6Q/rx.lock
-spool/BYRR...CG6Q/rx/
-spool/BYRR...CG6Q/tx.lock
-spool/BYRR...CG6Q/tx/AQUT...DGNT.seen
-spool/BYRR...CG6Q/tx/NSYY...ZUU6
-spool/BYRR...CG6Q/tx/VCSR...3VXX.seen
-spool/BYRR...CG6Q/tx/ZI5U...5RRQ
+spool/2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ/toss.lock
+spool/2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ/rx.lock
+spool/2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ/rx/
+spool/2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ/tx.lock
+spool/2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ/tx/LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ
+spool/tmp
@end example
-Except for @file{tmp}, all other directories are Base32-encoded node
-identifiers (@file{2WHB...OABQ}, @file{BYRR...CG6Q} in our example).
-Each node subdirectory has @file{rx} (received, partially received and
-currently unprocessed packets) and @file{tx} (for outbound packets)
-directories.
-
-Each @file{rx}/@file{tx} directory contains one file per encrypted
-packet. Its filename is Base32 encoded BLAKE2b hash of the contents. So
-it can be integrity checked at any time. @file{5ZIB...UMKW.part} is
-partially received file from @file{2WHB...OABQ} node. @file{tx}
-directory can not contain partially written files -- they are moved
-atomically from @file{tmp}.
-
-When @ref{nncp-toss} utility is called with @option{-seen} option, it
-will create empty @file{XXX.seen} files, telling that some kind of
-packet was already tossed sometime.
-
-Only one process can work with @file{rx}/@file{tx} directories at once,
-so there are corresponding lock files.
+@table @file
+
+@item tmp
+directory contains various temporary files that under normal
+circumstances are renamed to necessary files inside other directories.
+All directories in @file{spool} @strong{have to} be on the same
+filesystem for working renaming.
+
+@item 2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ
+is an example Base32-encoded neighbour identifier.
+
+@item rx, tx
+directories are for incoming and outgoing encrypted packets. @file{rx}
+contains currently unfinished, non-checked, unprocessed, etc packets.
+
+@item toss.lock, rx.lock, tx.lock
+Lock files. Only single process can work with @file{rx}/@file{tx}
+directories at once.
+
+@item LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ
+is an example @ref{Encrypted, encrypted packet}. Its filename is Base32
+encoded BLAKE2b hash of the whole contents. It can be integrity checked
+anytime.
+
+@item LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ.part
+is an example @strong{partly} received file. It can appear only when
+online transfer is used. Its filename is sent by remote side and until
+file is fully downloaded -- it plays no role.
+
+@item LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ.nock
+non-checksummed (NoCK) @strong{fully} received file. Its checksum is
+verified against its filename either by @ref{nncp-check}, or by working
+online daemons. If it is correct, then its extension is trimmed.
+
+@item LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ.seen
+@ref{nncp-toss} utility can be invoked with @option{-seen} option,
+leading to creation of @file{.seen} files, telling that the file with
+specified hash has already been processed before. It could be useful
+when there are use-cases where multiple ways of packets transfer
+available and there is possibility of duplicates reception. You have to
+manually remove them, when you do not need them (probably because they
+are expired).
+
+@anchor{HdrFile}
+@item LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ.hdr
+If no @ref{CfgNoHdr, nohdr} option is enabled in configuration file,
+then @file{.hdr} files are automatically created for every ordinary
+(fully received and checksummed) packet. It literally contains just the
+header of the corresponding packet. It will be automatically created
+even during simple @ref{nncp-stat} call. On filesystems with big
+blocksize (ZFS for example) it can greatly help listing the packets in
+directories, because it prevents unnecessary read-amplification. On
+other filesystems probably it won't help at all, or even harm
+performance.
+
+There is a hack: you can create more dense @file{.hdr} allocation by
+removing all @file{.hdr} files and then running @command{nncp-stat},
+that will recreate them. In many cases many @file{.hdr} files will be
+allocated more or less linearly on the disk, decreasing listing time
+even more.
+
+@end table
Fingerprint: 92C2 F0AE FE73 208E 46BF F3DE 2B25 868E 75A1 A953
Please send questions regarding the use of NNCP, bug reports and patches
-to mailing list: https://lists.cypherpunks.ru/pipermail/nncp-devel/
+to mailing list: http://lists.cypherpunks.ru/nncp_002ddevel.html
EOF
cat <<EOF
Пожалуйста, все вопросы касающиеся использования NNCP, отчёты об ошибках
и патчи отправляйте в nncp-devel почтовую рассылку:
-https://lists.cypherpunks.ru/pipermail/nncp-devel/
+http://lists.cypherpunks.ru/nncp_002ddevel.html
EOF
-/var/spool/nncp/log 644 7 100 * CYN
+/var/spool/nncp/log 644 7 100 * BCYN
OnlineDeadline time.Duration
MaxOnlineTime time.Duration
WhenTxExists bool
+ NoCK bool
AutoToss bool
AutoTossDoSeen bool
rxRate, txRate int,
onlineDeadline, maxOnlineTime time.Duration,
listOnly bool,
+ noCK bool,
onlyPkts map[[32]byte]bool,
) (isGood bool) {
for _, addr := range addrs {
rxRate: rxRate,
txRate: txRate,
listOnly: listOnly,
+ NoCK: noCK,
onlyPkts: onlyPkts,
}
if err = state.StartI(conn); err == nil {
OnlineDeadline *uint `json:"onlinedeadline,omitempty"`
MaxOnlineTime *uint `json:"maxonlinetime,omitempty"`
WhenTxExists *bool `json:"when-tx-exists,omitempty"`
+ NoCK *bool `json:"nock"`
AutoToss *bool `json:"autotoss,omitempty"`
AutoTossDoSeen *bool `json:"autotoss-doseen,omitempty"`
Umask string `json:"umask,omitempty"`
OmitPrgrs bool `json:"noprogress,omitempty"`
+ NoHdr bool `json:"nohdr,omitempty"`
Notify *NotifyJSON `json:"notify,omitempty"`
if callCfg.WhenTxExists != nil {
call.WhenTxExists = *callCfg.WhenTxExists
}
+ if callCfg.NoCK != nil {
+ call.NoCK = *callCfg.NoCK
+ }
if callCfg.AutoToss != nil {
call.AutoToss = *callCfg.AutoToss
}
if cfgJSON.OmitPrgrs {
showPrgrs = false
}
+ hdrUsage := true
+ if cfgJSON.NoHdr {
+ hdrUsage = false
+ }
ctx := Ctx{
Spool: spoolPath,
LogPath: logPath,
UmaskForce: umaskForce,
ShowPrgrs: showPrgrs,
+ HdrUsage: hdrUsage,
Self: self,
Neigh: make(map[NodeId]*Node, len(cfgJSON.Neigh)),
Alias: make(map[string]*NodeId),
"errors"
"io"
"log"
+ "os"
+ "path/filepath"
"golang.org/x/crypto/blake2b"
)
+const NoCKSuffix = ".nock"
+
func Check(src io.Reader, checksum []byte, les LEs, showPrgrs bool) (bool, error) {
hsh, err := blake2b.New256(nil)
if err != nil {
{"Pkt", Base32Codec.EncodeToString(job.HshValue[:])},
{"FullSize", job.Size},
}
- gut, err := Check(job.Fd, job.HshValue[:], les, ctx.ShowPrgrs)
- job.Fd.Close() // #nosec G104
+ fd, err := os.Open(job.Path)
+ if err != nil {
+ ctx.LogE("check", les, err, "")
+ return true
+ }
+ gut, err := Check(fd, job.HshValue[:], les, ctx.ShowPrgrs)
+ fd.Close() // #nosec G104
if err != nil {
ctx.LogE("check", les, err, "")
return true
func (ctx *Ctx) Check(nodeId *NodeId) bool {
return !(ctx.checkXxIsBad(nodeId, TRx) || ctx.checkXxIsBad(nodeId, TTx))
}
+
+func (ctx *Ctx) CheckNoCK(nodeId *NodeId, hshValue *[32]byte) (int64, error) {
+ dirToSync := filepath.Join(ctx.Spool, nodeId.String(), string(TRx))
+ pktName := Base32Codec.EncodeToString(hshValue[:])
+ pktPath := filepath.Join(dirToSync, pktName)
+ fd, err := os.Open(pktPath + NoCKSuffix)
+ if err != nil {
+ return 0, err
+ }
+ defer fd.Close()
+ fi, err := fd.Stat()
+ if err != nil {
+ return 0, err
+ }
+ defer fd.Close()
+ size := fi.Size()
+ les := LEs{
+ {"XX", string(TRx)},
+ {"Node", nodeId},
+ {"Pkt", pktName},
+ {"FullSize", size},
+ }
+ gut, err := Check(fd, hshValue[:], les, ctx.ShowPrgrs)
+ if err != nil || !gut {
+ return 0, errors.New("checksum mismatch")
+ }
+ if err = os.Rename(pktPath+NoCKSuffix, pktPath); err != nil {
+ return 0, err
+ }
+ if err = DirSync(dirToSync); err != nil {
+ return size, err
+ }
+ if ctx.HdrUsage {
+ if _, err = fd.Seek(0, io.SeekStart); err != nil {
+ return size, err
+ }
+ _, pktEncRaw, err := ctx.HdrRead(fd)
+ if err != nil {
+ return size, err
+ }
+ ctx.HdrWrite(pktEncRaw, pktPath)
+ }
+ return size, err
+}
{K: "Pkt", V: "dummy"},
}
for job := range ctx.Jobs(&nodeId, nncp.TTx) {
- pktName = filepath.Base(job.Fd.Name())
+ pktName = filepath.Base(job.Path)
les[len(les)-1].V = pktName
if job.PktEnc.Nice > nice {
ctx.LogD("nncp-bundle", les, "too nice")
- job.Fd.Close() // #nosec G104
continue
}
+ fd, err := os.Open(job.Path)
+ if err != nil {
+ log.Fatalln("Error during opening:", err)
+ }
if err = tarWr.WriteHeader(&tar.Header{
Format: tar.FormatUSTAR,
Name: nncp.NNCPBundlePrefix,
log.Fatalln("Error writing tar header:", err)
}
if _, err = nncp.CopyProgressed(
- tarWr, job.Fd, "Tx",
+ tarWr, bufio.NewReader(fd), "Tx",
append(les, nncp.LEs{
{K: "Pkt", V: nncp.Base32Codec.EncodeToString(job.HshValue[:])},
{K: "FullSize", V: job.Size},
); err != nil {
log.Fatalln("Error during copying to tar:", err)
}
- job.Fd.Close() // #nosec G104
+ if err = fd.Close(); err != nil {
+ log.Fatalln("Error during closing:", err)
+ }
if err = tarWr.Flush(); err != nil {
log.Fatalln("Error during tar flushing:", err)
}
log.Fatalln("Error during stdout flushing:", err)
}
if *doDelete {
- if err = os.Remove(job.Fd.Name()); err != nil {
+ if err = os.Remove(job.Path); err != nil {
log.Fatalln("Error during deletion:", err)
+ } else if ctx.HdrUsage {
+ os.Remove(job.Path + nncp.HdrSuffix)
}
}
ctx.LogI("nncp-bundle", append(les, nncp.LE{K: "Size", V: job.Size}), "")
if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) == pktName {
ctx.LogI("nncp-bundle", les, "removed")
if !*dryRun {
- os.Remove(dstPath) // #nosec G104
+ os.Remove(dstPath)
+ if ctx.HdrUsage {
+ os.Remove(dstPath + nncp.HdrSuffix)
+ }
}
} else {
ctx.LogE("nncp-bundle", les, errors.New("bad checksum"), "")
if err = nncp.DirSync(dstDirPath); err != nil {
log.Fatalln("Error during syncing:", err)
}
+ if ctx.HdrUsage {
+ ctx.HdrWrite(pktEncBuf, dstPath)
+ }
}
}
for _, le := range les {
rxOnly = flag.Bool("rx", false, "Only receive packets")
txOnly = flag.Bool("tx", false, "Only transmit packets")
listOnly = flag.Bool("list", false, "Only list remote packets")
+ noCK = flag.Bool("nock", false, "Do no checksum checking")
onlyPktsRaw = flag.String("pkts", "", "Recieve only that packets, comma separated")
rxRate = flag.Int("rxrate", 0, "Maximal receive rate, pkts/sec")
txRate = flag.Int("txrate", 0, "Maximal transmit rate, pkts/sec")
onlineDeadline,
maxOnlineTime,
*listOnly,
+ *noCK,
onlyPkts,
)
ctx.LogD("caller", les, "checking tx existence")
txExists := false
for job := range ctx.Jobs(node.Id, nncp.TTx) {
- job.Fd.Close()
if job.PktEnc.Nice > call.Nice {
continue
}
call.OnlineDeadline,
call.MaxOnlineTime,
false,
+ call.NoCK,
nil,
)
# umask: "022"
# Omit progress showing by default
# noprogress: true
+ # Do not use .hdr files
+ # nohdr: true
# Enable notification email sending
# notify: {
# # xx: rx
# # addr: lan
# # when-tx-exists: true
+ # # nock: true
# #
# # autotoss: false
# # autotoss-doseen: true
"fmt"
"log"
"os"
+ "path/filepath"
"go.cypherpunks.ru/nncp/v5"
)
func usage() {
fmt.Fprintf(os.Stderr, nncp.UsageHeader())
fmt.Fprintf(os.Stderr, "nncp-check -- verify Rx/Tx packets checksum\n\n")
- fmt.Fprintf(os.Stderr, "Usage: %s [options]\nOptions:\n", os.Args[0])
+ fmt.Fprintf(os.Stderr, "Usage: %s [-nock] [options]\nOptions:\n", os.Args[0])
flag.PrintDefaults()
}
func main() {
var (
+ nock = flag.Bool("nock", false, "Process .nock files")
cfgPath = flag.String("cfg", nncp.DefaultCfgPath, "Path to configuration file")
nodeRaw = flag.String("node", "", "Process only that node")
spoolPath = flag.String("spool", "", "Override path to spool")
if nodeOnly != nil && nodeId != *nodeOnly.Id {
continue
}
- if !ctx.Check(node.Id) {
+ if *nock {
+ for job := range ctx.JobsNoCK(node.Id) {
+ if _, err = ctx.CheckNoCK(node.Id, job.HshValue); err != nil {
+ pktName := nncp.Base32Codec.EncodeToString(job.HshValue[:])
+ log.Println(filepath.Join(
+ ctx.Spool,
+ nodeId.String(),
+ string(nncp.TRx),
+ pktName+nncp.NoCKSuffix,
+ ), err)
+ isBad = true
+ }
+ }
+ } else if !ctx.Check(node.Id) {
isBad = true
}
}
ctx *nncp.Ctx,
conn nncp.ConnDeadlined,
nice uint8,
+ noCK bool,
nodeIdC chan *nncp.NodeId,
) {
state := nncp.SPState{
Ctx: ctx,
Nice: nice,
+ NoCK: noCK,
}
if err := state.StartR(conn); err == nil {
ctx.LogI("call-start", nncp.LEs{{K: "Node", V: state.Node.Id}}, "connected")
bind = flag.String("bind", "[::]:5400", "Address to bind to")
inetd = flag.Bool("inetd", false, "Is it started as inetd service")
maxConn = flag.Int("maxconn", 128, "Maximal number of simultaneous connections")
+ noCK = flag.Bool("nock", false, "Do no checksum checking")
spoolPath = flag.String("spool", "", "Override path to spool")
logPath = flag.String("log", "", "Override path to logfile")
quiet = flag.Bool("quiet", false, "Print only errors")
os.Stderr.Close() // #nosec G104
conn := &InetdConn{os.Stdin, os.Stdout}
nodeIdC := make(chan *nncp.NodeId)
- go performSP(ctx, conn, nice, nodeIdC)
+ go performSP(ctx, conn, nice, *noCK, nodeIdC)
nodeId := <-nodeIdC
var autoTossFinish chan struct{}
var autoTossBadCode chan bool
ctx.LogD("daemon", nncp.LEs{{K: "Addr", V: conn.RemoteAddr()}}, "accepted")
go func(conn net.Conn) {
nodeIdC := make(chan *nncp.NodeId)
- go performSP(ctx, conn, nice, nodeIdC)
+ go performSP(ctx, conn, nice, *noCK, nodeIdC)
nodeId := <-nodeIdC
var autoTossFinish chan struct{}
var autoTossBadCode chan bool
fmt.Fprintf(os.Stderr, " %s [options] -lock\n", os.Args[0])
fmt.Fprintf(os.Stderr, " %s [options] -node NODE -part\n", os.Args[0])
fmt.Fprintf(os.Stderr, " %s [options] -node NODE -seen\n", os.Args[0])
+ fmt.Fprintf(os.Stderr, " %s [options] -node NODE -nock\n", os.Args[0])
+ fmt.Fprintf(os.Stderr, " %s [options] -node NODE -hdr\n", os.Args[0])
fmt.Fprintf(os.Stderr, " %s [options] -node NODE {-rx|-tx}\n", os.Args[0])
fmt.Fprintf(os.Stderr, " %s [options] -node NODE -pkt PKT\n", os.Args[0])
fmt.Fprintln(os.Stderr, "-older option's time units are: (s)econds, (m)inutes, (h)ours, (d)ays")
var (
cfgPath = flag.String("cfg", nncp.DefaultCfgPath, "Path to configuration file")
doTmp = flag.Bool("tmp", false, "Remove all temporary files")
+ doHdr = flag.Bool("hdr", false, "Remove all .hdr files")
doLock = flag.Bool("lock", false, "Remove all lock files")
nodeRaw = flag.String("node", "", "Node to remove files in")
doRx = flag.Bool("rx", false, "Process received packets")
doTx = flag.Bool("tx", false, "Process transfered packets")
doPart = flag.Bool("part", false, "Remove only .part files")
doSeen = flag.Bool("seen", false, "Remove only .seen files")
+ doNoCK = flag.Bool("nock", false, "Remove only .nock files")
older = flag.String("older", "", "XXX{smhd}: only older than XXX number of time units")
dryRun = flag.Bool("dryrun", false, "Do not actually remove files")
pktRaw = flag.String("pkt", "", "Packet to remove")
ctx.LogD("nncp-rm", nncp.LEs{{K: "File", V: path}}, "too fresh, skipping")
return nil
}
- if *doSeen && strings.HasSuffix(info.Name(), nncp.SeenSuffix) {
- ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "")
- if *dryRun {
- return nil
- }
- return os.Remove(path)
- }
- if *doPart && strings.HasSuffix(info.Name(), nncp.PartSuffix) {
+ if (*doSeen && strings.HasSuffix(info.Name(), nncp.SeenSuffix)) ||
+ (*doNoCK && strings.HasSuffix(info.Name(), nncp.NoCKSuffix)) ||
+ (*doHdr && strings.HasSuffix(info.Name(), nncp.HdrSuffix)) ||
+ (*doPart && strings.HasSuffix(info.Name(), nncp.PartSuffix)) {
ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "")
if *dryRun {
return nil
}
return os.Remove(path)
}
- if !*doSeen &&
- !*doPart &&
+ if !*doSeen && !*doNoCK && !*doHdr && !*doPart &&
(*doRx || *doTx) &&
((*doRx && xx == nncp.TRx) || (*doTx && xx == nncp.TTx)) {
ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "")
return nil
})
}
- if *pktRaw != "" || *doRx || *doSeen || *doPart {
+ if *pktRaw != "" || *doRx || *doSeen || *doNoCK || *doHdr || *doPart {
if err = remove(nncp.TRx); err != nil {
log.Fatalln("Can not remove:", err)
}
}
- if *pktRaw != "" || *doTx {
+ if *pktRaw != "" || *doTx || *doHdr {
if err = remove(nncp.TTx); err != nil {
log.Fatalln("Can not remove:", err)
}
fmt.Println(node.Name)
rxNums := make(map[uint8]int)
rxBytes := make(map[uint8]int64)
+ noCKNums := make(map[uint8]int)
+ noCKBytes := make(map[uint8]int64)
for job := range ctx.Jobs(node.Id, nncp.TRx) {
- job.Fd.Close() // #nosec G104
if *showPkt {
jobPrint(nncp.TRx, job)
}
rxNums[job.PktEnc.Nice] = rxNums[job.PktEnc.Nice] + 1
rxBytes[job.PktEnc.Nice] = rxBytes[job.PktEnc.Nice] + job.Size
}
+ for job := range ctx.JobsNoCK(node.Id) {
+ if *showPkt {
+ jobPrint(nncp.TRx, job)
+ }
+ noCKNums[job.PktEnc.Nice] = noCKNums[job.PktEnc.Nice] + 1
+ noCKBytes[job.PktEnc.Nice] = noCKBytes[job.PktEnc.Nice] + job.Size
+ }
txNums := make(map[uint8]int)
txBytes := make(map[uint8]int64)
for job := range ctx.Jobs(node.Id, nncp.TTx) {
- job.Fd.Close() // #nosec G104
if *showPkt {
jobPrint(nncp.TTx, job)
}
for nice = 1; nice > 0; nice++ {
rxNum, rxExists := rxNums[nice]
txNum, txExists := txNums[nice]
- if !(rxExists || txExists) {
+ noCKNum, noCKExists := noCKNums[nice]
+ if !(rxExists || txExists || noCKExists) {
continue
}
fmt.Printf(
- "\tnice:% 4s | Rx: % 10s, % 3d pkts | Tx: % 10s, % 3d pkts\n",
+ "\tnice:% 4s | Rx: % 10s, % 3d pkts | Tx: % 10s, % 3d pkts",
nncp.NicenessFmt(nice),
humanize.IBytes(uint64(rxBytes[nice])),
rxNum,
humanize.IBytes(uint64(txBytes[nice])),
txNum,
)
+ if noCKExists {
+ fmt.Printf(
+ " | NoCK: % 10s, % 3d pkts",
+ humanize.IBytes(uint64(noCKBytes[nice])),
+ noCKNum,
+ )
+ }
+ fmt.Printf("\n")
}
}
}
"os"
"path/filepath"
- xdr "github.com/davecgh/go-xdr/xdr2"
"go.cypherpunks.ru/nncp/v5"
)
isBad = true
continue
}
- var pktEnc nncp.PktEnc
- _, err = xdr.Unmarshal(fd, &pktEnc)
+ pktEnc, pktEncRaw, err := ctx.HdrRead(fd)
if err != nil || pktEnc.Magic != nncp.MagicNNCPEv4 {
ctx.LogD("nncp-xfer", les, "is not a packet")
fd.Close() // #nosec G104
isBad = true
}
}
+ if ctx.HdrUsage {
+ ctx.HdrWrite(pktEncRaw, filepath.Join(
+ ctx.Spool,
+ nodeId.String(),
+ string(nncp.TRx),
+ tmp.Checksum(),
+ ))
+ }
}
}
}
les = les[:len(les)-1]
for job := range ctx.Jobs(&nodeId, nncp.TTx) {
- pktName := filepath.Base(job.Fd.Name())
+ pktName := filepath.Base(job.Path)
les := append(les, nncp.LE{K: "Pkt", V: pktName})
if job.PktEnc.Nice > nice {
ctx.LogD("nncp-xfer", les, "too nice")
- job.Fd.Close() // #nosec G104
continue
}
if _, err = os.Stat(filepath.Join(dstPath, pktName)); err == nil || !os.IsNotExist(err) {
ctx.LogD("nncp-xfer", les, "already exists")
- job.Fd.Close() // #nosec G104
continue
}
if _, err = os.Stat(filepath.Join(dstPath, pktName+nncp.SeenSuffix)); err == nil || !os.IsNotExist(err) {
ctx.LogD("nncp-xfer", les, "already exists")
- job.Fd.Close() // #nosec G104
continue
}
tmp, err := nncp.TempFile(dstPath, "xfer")
if err != nil {
ctx.LogE("nncp-xfer", les, err, "mktemp")
- job.Fd.Close() // #nosec G104
isBad = true
break
}
les = append(les, nncp.LE{K: "Tmp", V: tmp.Name()})
ctx.LogD("nncp-xfer", les, "created")
+ fd, err := os.Open(job.Path)
+ if err != nil {
+ ctx.LogE("nncp-xfer", les, err, "open")
+ tmp.Close() // #nosec G104
+ isBad = true
+ continue
+ }
bufW := bufio.NewWriter(tmp)
copied, err := nncp.CopyProgressed(
- bufW, bufio.NewReader(job.Fd), "Tx",
+ bufW, bufio.NewReader(fd), "Tx",
append(les, nncp.LE{K: "FullSize", V: job.Size}),
ctx.ShowPrgrs,
)
- job.Fd.Close() // #nosec G104
+ fd.Close() // #nosec G104
if err != nil {
ctx.LogE("nncp-xfer", les, err, "copy")
tmp.Close() // #nosec G104
les = les[:len(les)-1]
ctx.LogI("nncp-xfer", append(les, nncp.LE{K: "Size", V: copied}), "")
if !*keep {
- if err = os.Remove(job.Fd.Name()); err != nil {
+ if err = os.Remove(job.Path); err != nil {
ctx.LogE("nncp-xfer", les, err, "remove")
isBad = true
+ } else if ctx.HdrUsage {
+ os.Remove(job.Path + nncp.HdrSuffix)
}
}
}
UmaskForce *int
Quiet bool
ShowPrgrs bool
+ HdrUsage bool
Debug bool
NotifyFile *FromToJSON
NotifyFreq *FromToJSON
github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75
github.com/hjson/hjson-go v3.1.0+incompatible
- github.com/klauspost/compress v1.11.4
+ github.com/klauspost/compress v1.11.7
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
go.cypherpunks.ru/balloon v1.1.1
go.cypherpunks.ru/recfile v0.4.3
- golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad
- golang.org/x/net v0.0.0-20201224014010-6772e930b67b
- golang.org/x/sys v0.0.0-20210105210732-16f7687f5001
- golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1
+ golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83
+ golang.org/x/net v0.0.0-20210220033124-5f55cee0dc0d
+ golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43
+ golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect
)
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75/go.mod h1:g2644b03hfBX9Ov0ZBDgXXens4rxSxmqFBbhvKv2yVA=
github.com/hjson/hjson-go v3.1.0+incompatible h1:DY/9yE8ey8Zv22bY+mHV1uk2yRy0h8tKhZ77hEdi0Aw=
github.com/hjson/hjson-go v3.1.0+incompatible/go.mod h1:qsetwF8NlsTsOTwZTApNlTCerV+b2GjYRRcIk4JMFio=
-github.com/klauspost/compress v1.11.4 h1:kz40R/YWls3iqT9zX9AHN3WoVsrAWVyui5sxuLqiXqU=
-github.com/klauspost/compress v1.11.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
+github.com/klauspost/compress v1.11.7 h1:0hzRabrMN4tSTvMfnL3SCv1ZGeAP23ynzodBgaHeMeg=
+github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
go.cypherpunks.ru/recfile v0.4.3 h1:ephokihmV//p0ob6gx2FWXvm28/NBDbWTOJPUNahxO8=
go.cypherpunks.ru/recfile v0.4.3/go.mod h1:sR+KajB+vzofL3SFVFwKt3Fke0FaCcN1g3YPNAhU3qI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
-golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad h1:DN0cp81fZ3njFcrLCytUHRSUkqBjfTo4Tx9RJTWs0EY=
-golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
+golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 h1:/ZScEX8SfEmUGRHs0gxpqteO5nfNW6axyZbBdw9A12g=
+golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
-golang.org/x/net v0.0.0-20201224014010-6772e930b67b h1:iFwSg7t5GZmB/Q5TjiEAsdoLDrdJRC1RiF2WhuV29Qw=
-golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
+golang.org/x/net v0.0.0-20210220033124-5f55cee0dc0d h1:1aflnvSoWWLI2k/dMUAl5lvU1YO4Mb4hz0gh+1rjcxU=
+golang.org/x/net v0.0.0-20210220033124-5f55cee0dc0d/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20210105210732-16f7687f5001 h1:/dSxr6gT0FNI1MO5WLJo8mTmItROeOKTkDn+7OwWBos=
-golang.org/x/sys v0.0.0-20210105210732-16f7687f5001/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43 h1:SgQ6LNaYJU0JIuEHv9+s6EbhSCwYeAf5Yvj6lpYlqAE=
+golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
+golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d h1:SZxvLBoTP5yHO3Frd4z4vrF+DBX9vMVanchswa69toE=
+golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
if err == nil {
nodeS = node.Name
}
+ var sizeParsed uint64
var size string
if sizeRaw, exists := le["Size"]; exists {
- sp, err := strconv.ParseUint(sizeRaw, 10, 64)
+ sizeParsed, err = strconv.ParseUint(sizeRaw, 10, 64)
if err != nil {
return "", err
}
- size = humanize.IBytes(uint64(sp))
+ size = humanize.IBytes(sizeParsed)
}
var msg string
if err, exists := le["Err"]; exists {
msg += ": " + err
}
-
case "sp-info":
nice, err := NicenessParse(le["Nice"])
if err != nil {
"Packet %s (%s) (nice %s)",
le["Pkt"], size, NicenessFmt(nice),
)
- offsetParsed, err := strconv.ParseUint(le["Offset"], 10, 64)
- if err != nil {
- return "", err
- }
- sizeParsed, err := strconv.ParseUint(le["Size"], 10, 64)
- if err != nil {
- return "", err
+ if offset := le["Offset"]; offset != "" {
+ offsetParsed, err := strconv.ParseUint(offset, 10, 64)
+ if err != nil {
+ return "", err
+ }
+ msg += fmt.Sprintf(": %d%%", 100*offsetParsed/sizeParsed)
}
- msg += fmt.Sprintf(": %d%%", 100*offsetParsed/sizeParsed)
if m, exists := le["Msg"]; exists {
msg += ": " + m
}
if err != nil {
return "", err
}
- sizeParsed, err := strconv.ParseUint(le["Size"], 10, 64)
- if err != nil {
- return "", err
- }
msg += fmt.Sprintf(
"%s %d%% (%s / %s)",
le["Pkt"],
package nncp
import (
- "io"
+ "bytes"
"os"
"path/filepath"
+ "strings"
xdr "github.com/davecgh/go-xdr/xdr2"
)
const (
TRx TRxTx = "rx"
TTx TRxTx = "tx"
+
+ HdrSuffix = ".hdr"
)
type Job struct {
PktEnc *PktEnc
- Fd *os.File
+ Path string
Size int64
HshValue *[32]byte
}
-func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job {
+func (ctx *Ctx) HdrRead(fd *os.File) (*PktEnc, []byte, error) {
+ var pktEnc PktEnc
+ _, err := xdr.Unmarshal(fd, &pktEnc)
+ if err != nil {
+ return nil, nil, err
+ }
+ var raw bytes.Buffer
+ if _, err = xdr.Marshal(&raw, pktEnc); err != nil {
+ panic(err)
+ }
+ return &pktEnc, raw.Bytes(), nil
+}
+
+func (ctx *Ctx) HdrWrite(pktEncRaw []byte, tgt string) error {
+ tmpHdr, err := ctx.NewTmpFile()
+ if err != nil {
+ ctx.LogE("hdr-write", []LE{}, err, "new")
+ return err
+ }
+ if _, err = tmpHdr.Write(pktEncRaw); err != nil {
+ ctx.LogE("hdr-write", []LE{}, err, "write")
+ os.Remove(tmpHdr.Name())
+ return err
+ }
+ if err = tmpHdr.Close(); err != nil {
+ ctx.LogE("hdr-write", []LE{}, err, "close")
+ os.Remove(tmpHdr.Name())
+ return err
+ }
+ if err = os.Rename(tmpHdr.Name(), tgt+HdrSuffix); err != nil {
+ ctx.LogE("hdr-write", []LE{}, err, "rename")
+ return err
+ }
+ return err
+}
+
+func (ctx *Ctx) jobsFind(nodeId *NodeId, xx TRxTx, nock bool) chan Job {
rxPath := filepath.Join(ctx.Spool, nodeId.String(), string(xx))
jobs := make(chan Job, 16)
go func() {
return
}
for _, fi := range fis {
- hshValue, err := Base32Codec.DecodeString(fi.Name())
- if err != nil {
- continue
+ name := fi.Name()
+ var hshValue []byte
+ if nock {
+ if !strings.HasSuffix(name, NoCKSuffix) ||
+ len(name) != Base32Encoded32Len+len(NoCKSuffix) {
+ continue
+ }
+ hshValue, err = Base32Codec.DecodeString(
+ strings.TrimSuffix(name, NoCKSuffix),
+ )
+ } else {
+ if len(name) != Base32Encoded32Len {
+ continue
+ }
+ hshValue, err = Base32Codec.DecodeString(name)
}
- fd, err := os.Open(filepath.Join(rxPath, fi.Name()))
if err != nil {
continue
}
- var pktEnc PktEnc
- if _, err = xdr.Unmarshal(fd, &pktEnc); err != nil || pktEnc.Magic != MagicNNCPEv4 {
- fd.Close() // #nosec G104
+ pth := filepath.Join(rxPath, name)
+ hdrExists := true
+ var fd *os.File
+ if nock {
+ fd, err = os.Open(pth)
+ } else {
+ fd, err = os.Open(pth + HdrSuffix)
+ if err != nil && os.IsNotExist(err) {
+ hdrExists = false
+ fd, err = os.Open(pth)
+ }
+ }
+ if err != nil {
continue
}
- if _, err = fd.Seek(0, io.SeekStart); err != nil {
- fd.Close() // #nosec G104
+ pktEnc, pktEncRaw, err := ctx.HdrRead(fd)
+ fd.Close()
+ if err != nil || pktEnc.Magic != MagicNNCPEv4 {
continue
}
ctx.LogD("jobs", LEs{
{"XX", string(xx)},
{"Node", pktEnc.Sender},
- {"Name", fi.Name()},
+ {"Name", name},
{"Nice", int(pktEnc.Nice)},
{"Size", fi.Size()},
}, "taken")
+ if !hdrExists && ctx.HdrUsage {
+ ctx.HdrWrite(pktEncRaw, pth)
+ }
job := Job{
- PktEnc: &pktEnc,
- Fd: fd,
+ PktEnc: pktEnc,
+ Path: pth,
Size: fi.Size(),
HshValue: new([32]byte),
}
}()
return jobs
}
+
+func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job {
+ return ctx.jobsFind(nodeId, xx, false)
+}
+
+func (ctx *Ctx) JobsNoCK(nodeId *NodeId) chan Job {
+ return ctx.jobsFind(nodeId, TRx, true)
+}
switch v := le.V.(type) {
case int, int8, uint8, int64, uint64:
val = fmt.Sprintf("%d", v)
+ case bool:
+ val = fmt.Sprintf("%v", v)
default:
val = fmt.Sprintf("%s", v)
}
along with this program. If not, see <http://www.gnu.org/licenses/>.`
)
+const Base32Encoded32Len = 52
+
var (
- Version string = "6.0.0"
+ Version string = "6.1.0"
Base32Codec *base32.Encoding = base32.StdEncoding.WithPadding(base32.NoPadding)
)
size, padSize int64,
data io.Reader,
out io.Writer,
-) error {
+) ([]byte, error) {
pubEph, prvEph, err := box.GenerateKey(rand.Reader)
if err != nil {
- return err
+ return nil, err
}
var pktBuf bytes.Buffer
if _, err := xdr.Marshal(&pktBuf, pkt); err != nil {
- return err
+ return nil, err
}
tbs := PktTbs{
Magic: MagicNNCPEv4,
}
var tbsBuf bytes.Buffer
if _, err = xdr.Marshal(&tbsBuf, &tbs); err != nil {
- return err
+ return nil, err
}
signature := new([ed25519.SignatureSize]byte)
copy(signature[:], ed25519.Sign(our.SignPrv, tbsBuf.Bytes()))
ExchPub: *pubEph,
Sign: *signature,
}
- if _, err = xdr.Marshal(out, &pktEnc); err != nil {
- return err
+ tbsBuf.Reset()
+ if _, err = xdr.Marshal(&tbsBuf, &pktEnc); err != nil {
+ return nil, err
+ }
+ pktEncRaw := tbsBuf.Bytes()
+ if _, err = out.Write(pktEncRaw); err != nil {
+ return nil, err
}
sharedKey := new([32]byte)
curve25519.ScalarMult(sharedKey, prvEph, their.ExchPub)
kdf, err := blake2b.NewXOF(KDFXOFSize, sharedKey[:])
if err != nil {
- return err
+ return nil, err
}
if _, err = kdf.Write(MagicNNCPEv4[:]); err != nil {
- return err
+ return nil, err
}
key := make([]byte, chacha20poly1305.KeySize)
if _, err = io.ReadFull(kdf, key); err != nil {
- return err
+ return nil, err
}
aead, err := chacha20poly1305.New(key)
if err != nil {
- return err
+ return nil, err
}
nonce := make([]byte, aead.NonceSize())
sizeBuf := make([]byte, 8+aead.Overhead())
binary.BigEndian.PutUint64(sizeBuf, uint64(sizeWithTags(int64(fullSize))))
if _, err = out.Write(aead.Seal(sizeBuf[:0], nonce, sizeBuf[:8], nil)); err != nil {
- return err
+ return nil, err
}
lr := io.LimitedReader{R: data, N: size}
mr := io.MultiReader(&pktBuf, &lr)
written, err := aeadProcess(aead, nonce, true, mr, out)
if err != nil {
- return err
+ return nil, err
}
if written != fullSize {
- return io.ErrUnexpectedEOF
+ return nil, io.ErrUnexpectedEOF
}
if padSize > 0 {
if _, err = io.ReadFull(kdf, key); err != nil {
- return err
+ return nil, err
}
kdf, err = blake2b.NewXOF(blake2b.OutputLengthUnknown, key)
if err != nil {
- return err
+ return nil, err
}
if _, err = io.CopyN(out, kdf, padSize); err != nil {
- return err
+ return nil, err
}
}
- return nil
+ return pktEncRaw, nil
}
func TbsVerify(our *NodeOur, their *Node, pktEnc *PktEnc) (bool, error) {
if err != nil {
panic(err)
}
- err = PktEncWrite(
+ _, err = PktEncWrite(
nodeOur,
nodeTheir.Their(),
pkt,
if err != nil {
panic(err)
}
- err = PktEncWrite(
+ _, err = PktEncWrite(
node1,
node2.Their(),
pkt,
progressBarsLock.Unlock()
}
what := pkt
- if len(what) >= 52 { // Base32 encoded
+ if len(what) >= Base32Encoded32Len { // Base32 encoded
what = what[:16] + ".." + what[len(what)-16:]
}
what = prefix + " " + what
"bytes"
"crypto/subtle"
"errors"
+ "hash"
"io"
"os"
"path/filepath"
xdr "github.com/davecgh/go-xdr/xdr2"
"github.com/flynn/noise"
+ "golang.org/x/crypto/blake2b"
)
const (
SPHeadOverhead = 4
)
+type SPCheckerQueues struct {
+ appeared chan *[32]byte
+ checked chan *[32]byte
+}
+
var (
MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
DefaultDeadline = 10 * time.Second
PingTimeout = time.Minute
- spCheckerToken chan struct{}
+ spCheckers = make(map[NodeId]*SPCheckerQueues)
)
+type FdAndFullSize struct {
+ fd *os.File
+ fullSize int64
+}
+
+type HasherAndOffset struct {
+ h hash.Hash
+ offset uint64
+}
+
type SPType uint8
const (
panic(err)
}
SPFileOverhead = buf.Len()
- spCheckerToken = make(chan struct{}, 1)
- spCheckerToken <- struct{}{}
}
func MarshalSP(typ SPType, sp interface{}) []byte {
Ctx *Ctx
Node *Node
Nice uint8
+ NoCK bool
onlineDeadline time.Duration
maxOnlineTime time.Duration
hs *noise.HandshakeState
listOnly bool
onlyPkts map[[32]byte]bool
writeSPBuf bytes.Buffer
+ fds map[string]FdAndFullSize
+ fileHashers map[string]*HasherAndOffset
+ checkerQueues SPCheckerQueues
sync.RWMutex
}
for range state.pings {
}
}()
+ go func() {
+ for _, s := range state.fds {
+ s.fd.Close()
+ }
+ }()
}
func (state *SPState) NotAlive() bool {
state.Ctx.UnlockDir(state.txLock)
}
+func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) {
+ for hshValue := range appeared {
+ les := LEs{
+ {"XX", string(TRx)},
+ {"Node", nodeId},
+ {"Pkt", Base32Codec.EncodeToString(hshValue[:])},
+ }
+ ctx.LogD("sp-checker", les, "checking")
+ size, err := ctx.CheckNoCK(nodeId, hshValue)
+ les = append(les, LE{"Size", size})
+ if err != nil {
+ ctx.LogE("sp-checker", les, err, "")
+ continue
+ }
+ ctx.LogI("sp-done", les, "")
+ go func(hsh *[32]byte) { checked <- hsh }(hshValue)
+ }
+}
+
func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
state.writeSPBuf.Reset()
n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
var infos []*SPInfo
var totalSize int64
for job := range ctx.Jobs(nodeId, TTx) {
- job.Fd.Close() // #nosec G104
if job.PktEnc.Nice > nice {
continue
}
state.infosTheir = make(map[[32]byte]*SPInfo)
state.started = started
state.xxOnly = xxOnly
+
var buf []byte
var payload []byte
state.Ctx.LogD("sp-start", LEs{{"Nice", int(state.Nice)}}, "waiting for first message")
return err
}
+func (state *SPState) closeFd(pth string) {
+ s, exists := state.fds[pth]
+ delete(state.fds, pth)
+ if exists {
+ s.fd.Close()
+ }
+}
+
func (state *SPState) StartWorkers(
conn ConnDeadlined,
infosPayloads [][]byte,
payload []byte,
) error {
les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
+ state.fds = make(map[string]FdAndFullSize)
+ state.fileHashers = make(map[string]*HasherAndOffset)
state.isDead = make(chan struct{})
if state.maxOnlineTime > 0 {
state.mustFinishAt = state.started.Add(state.maxOnlineTime)
}
+ // Checker
+ if !state.NoCK {
+ queues := spCheckers[*state.Node.Id]
+ if queues == nil {
+ queues = &SPCheckerQueues{
+ appeared: make(chan *[32]byte),
+ checked: make(chan *[32]byte),
+ }
+ spCheckers[*state.Node.Id] = queues
+ go SPChecker(state.Ctx, state.Node.Id, queues.appeared, queues.checked)
+ }
+ state.checkerQueues = *queues
+ go func() {
+ for job := range state.Ctx.JobsNoCK(state.Node.Id) {
+ if job.PktEnc.Nice <= state.Nice {
+ state.checkerQueues.appeared <- job.HshValue
+ }
+ }
+ }()
+ state.wg.Add(1)
+ go func() {
+ defer state.wg.Done()
+ for {
+ select {
+ case <-state.isDead:
+ return
+ case hsh := <-state.checkerQueues.checked:
+ state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh})
+ }
+ }
+ }()
+ }
+
// Remaining handshake payload sending
if len(infosPayloads) > 1 {
state.wg.Add(1)
{"Size", int64(freq.Offset)},
}...)
state.Ctx.LogD("sp-file", lesp, "queueing")
- fd, err := os.Open(filepath.Join(
+ pth := filepath.Join(
state.Ctx.Spool,
state.Node.Id.String(),
string(TTx),
Base32Codec.EncodeToString(freq.Hash[:]),
- ))
- if err != nil {
- state.Ctx.LogE("sp-file", lesp, err, "")
- return
- }
- fi, err := fd.Stat()
- if err != nil {
- state.Ctx.LogE("sp-file", lesp, err, "")
- return
+ )
+ fdAndFullSize, exists := state.fds[pth]
+ if !exists {
+ fd, err := os.Open(pth)
+ if err != nil {
+ state.Ctx.LogE("sp-file", lesp, err, "")
+ return
+ }
+ fi, err := fd.Stat()
+ if err != nil {
+ state.Ctx.LogE("sp-file", lesp, err, "")
+ return
+ }
+ fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
+ state.fds[pth] = fdAndFullSize
}
- fullSize := fi.Size()
+ fd := fdAndFullSize.fd
+ fullSize := fdAndFullSize.fullSize
var buf []byte
if freq.Offset < uint64(fullSize) {
state.Ctx.LogD("sp-file", lesp, "seeking")
buf = buf[:n]
state.Ctx.LogD("sp-file", append(lesp, LE{"Size", n}), "read")
}
- fd.Close() // #nosec G104
+ state.closeFd(pth)
payload = MarshalSP(SPTypeFile, SPFile{
Hash: freq.Hash,
Offset: freq.Offset,
state.Lock()
state.queueTheir = nil
state.Unlock()
+
case SPTypePing:
state.Ctx.LogD("sp-process", append(les, LE{"Type", "ping"}), "")
+
case SPTypeInfo:
infosGot = true
lesp := append(les, LE{"Type", "info"})
}
continue
}
+ if _, err = os.Stat(pktPath + NoCKSuffix); err == nil {
+ state.Ctx.LogI("sp-info", lesp, "still non checksummed")
+ continue
+ }
fi, err := os.Stat(pktPath + PartSuffix)
var offset int64
if err == nil {
SPFreq{info.Hash, uint64(offset)},
))
}
+
case SPTypeFile:
lesp := append(les, LE{"Type", "file"})
state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
string(TRx),
)
filePath := filepath.Join(dirToSync, Base32Codec.EncodeToString(file.Hash[:]))
+ filePathPart := filePath + PartSuffix
state.Ctx.LogD("sp-file", lesp, "opening part")
- fd, err := os.OpenFile(
- filePath+PartSuffix,
- os.O_RDWR|os.O_CREATE,
- os.FileMode(0666),
- )
- if err != nil {
- state.Ctx.LogE("sp-file", lesp, err, "")
- return nil, err
+ fdAndFullSize, exists := state.fds[filePathPart]
+ var fd *os.File
+ if exists {
+ fd = fdAndFullSize.fd
+ } else {
+ fd, err = os.OpenFile(
+ filePathPart,
+ os.O_RDWR|os.O_CREATE,
+ os.FileMode(0666),
+ )
+ if err != nil {
+ state.Ctx.LogE("sp-file", lesp, err, "")
+ return nil, err
+ }
+ state.fds[filePathPart] = FdAndFullSize{fd: fd}
+ if file.Offset == 0 {
+ h, err := blake2b.New256(nil)
+ if err != nil {
+ panic(err)
+ }
+ state.fileHashers[filePath] = &HasherAndOffset{h: h}
+ }
}
state.Ctx.LogD("sp-file", append(lesp, LE{"Offset", file.Offset}), "seeking")
if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
state.Ctx.LogE("sp-file", lesp, err, "")
- fd.Close() // #nosec G104
+ state.closeFd(filePathPart)
return nil, err
}
state.Ctx.LogD("sp-file", lesp, "writing")
- _, err = fd.Write(file.Payload)
- if err != nil {
+ if _, err = fd.Write(file.Payload); err != nil {
state.Ctx.LogE("sp-file", lesp, err, "")
- fd.Close() // #nosec G104
+ state.closeFd(filePathPart)
return nil, err
}
+ hasherAndOffset, hasherExists := state.fileHashers[filePath]
+ if hasherExists {
+ if hasherAndOffset.offset == file.Offset {
+ if _, err = hasherAndOffset.h.Write(file.Payload); err != nil {
+ panic(err)
+ }
+ hasherAndOffset.offset += uint64(len(file.Payload))
+ } else {
+ state.Ctx.LogE(
+ "sp-file", lesp,
+ errors.New("offset differs"),
+ "deleting hasher",
+ )
+ delete(state.fileHashers, filePath)
+ hasherExists = false
+ }
+ }
ourSize := int64(file.Offset + uint64(len(file.Payload)))
lesp[len(lesp)-1].V = ourSize
fullsize := int64(0)
Progress("Rx", lesp)
}
if fullsize != ourSize {
- fd.Close() // #nosec G104
continue
}
- <-spCheckerToken
- go func() {
- defer func() {
- spCheckerToken <- struct{}{}
- }()
- if err := fd.Sync(); err != nil {
- state.Ctx.LogE("sp-file", lesp, err, "sync")
- fd.Close() // #nosec G104
- return
- }
- state.wg.Add(1)
- defer state.wg.Done()
- if _, err = fd.Seek(0, io.SeekStart); err != nil {
- fd.Close() // #nosec G104
- state.Ctx.LogE("sp-file", lesp, err, "")
- return
- }
- state.Ctx.LogD("sp-file", lesp, "checking")
- gut, err := Check(fd, file.Hash[:], lesp, state.Ctx.ShowPrgrs)
- fd.Close() // #nosec G104
- if err != nil || !gut {
+ err = fd.Sync()
+ if err != nil {
+ state.Ctx.LogE("sp-file", lesp, err, "sync")
+ state.closeFd(filePathPart)
+ continue
+ }
+ if hasherExists {
+ if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 {
state.Ctx.LogE("sp-file", lesp, errors.New("checksum mismatch"), "")
- return
+ continue
}
- state.Ctx.LogI("sp-done", lesp, "")
- if err = os.Rename(filePath+PartSuffix, filePath); err != nil {
+ if err = os.Rename(filePathPart, filePath); err != nil {
state.Ctx.LogE("sp-file", lesp, err, "rename")
- return
+ continue
}
if err = DirSync(dirToSync); err != nil {
state.Ctx.LogE("sp-file", lesp, err, "sync")
- return
+ continue
}
- state.Lock()
- delete(state.infosTheir, *file.Hash)
- state.Unlock()
+ state.Ctx.LogI("sp-file", lesp, "done")
state.wg.Add(1)
go func() {
state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
state.wg.Done()
}()
- }()
+ state.Lock()
+ delete(state.infosTheir, *file.Hash)
+ state.Unlock()
+ if !state.Ctx.HdrUsage {
+ state.closeFd(filePathPart)
+ continue
+ }
+ if _, err = fd.Seek(0, io.SeekStart); err != nil {
+ state.Ctx.LogE("sp-file", lesp, err, "seek")
+ state.closeFd(filePathPart)
+ continue
+ }
+ _, pktEncRaw, err := state.Ctx.HdrRead(fd)
+ state.closeFd(filePathPart)
+ if err != nil {
+ state.Ctx.LogE("sp-file", lesp, err, "HdrRead")
+ continue
+ }
+ state.Ctx.HdrWrite(pktEncRaw, filePath)
+ continue
+ }
+ state.closeFd(filePathPart)
+ if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
+ state.Ctx.LogE("sp-file", lesp, err, "rename")
+ continue
+ }
+ if err = DirSync(dirToSync); err != nil {
+ state.Ctx.LogE("sp-file", lesp, err, "sync")
+ continue
+ }
+ state.Ctx.LogI("sp-file", lesp, "downloaded")
+ state.Lock()
+ delete(state.infosTheir, *file.Hash)
+ state.Unlock()
+ if !state.NoCK {
+ state.checkerQueues.appeared <- file.Hash
+ }
+
case SPTypeDone:
lesp := append(les, LE{"Type", "done"})
state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
return nil, err
}
lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(done.Hash[:])})
+ lesp = append(lesp, LE{"XX", string(TTx)})
state.Ctx.LogD("sp-done", lesp, "removing")
- err := os.Remove(filepath.Join(
+ pth := filepath.Join(
state.Ctx.Spool,
state.Node.Id.String(),
string(TTx),
Base32Codec.EncodeToString(done.Hash[:]),
- ))
- lesp = append(lesp, LE{"XX", string(TTx)})
- if err == nil {
+ )
+ if err = os.Remove(pth); err == nil {
state.Ctx.LogI("sp-done", lesp, "")
+ if state.Ctx.HdrUsage {
+ os.Remove(pth + HdrSuffix)
+ }
} else {
state.Ctx.LogE("sp-done", lesp, err, "")
}
+
case SPTypeFreq:
lesp := append(les, LE{"Type", "freq"})
state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
} else {
state.Ctx.LogD("sp-process", lesp, "unknown")
}
+
default:
state.Ctx.LogE(
"sp-process",
return fd.Close()
}
+func (tmp *TmpFileWHash) Checksum() string {
+ return Base32Codec.EncodeToString(tmp.Hsh.Sum(nil))
+}
+
func (tmp *TmpFileWHash) Commit(dir string) error {
var err error
if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
if err = tmp.Fd.Close(); err != nil {
return err
}
- checksum := Base32Codec.EncodeToString(tmp.Hsh.Sum(nil))
+ checksum := tmp.Checksum()
tmp.ctx.LogD("tmp", LEs{{"Src", tmp.Fd.Name()}, {"Dst", checksum}}, "commit")
if err = os.Rename(tmp.Fd.Name(), filepath.Join(dir, checksum)); err != nil {
return err
}
defer decompressor.Close()
for job := range ctx.Jobs(nodeId, TRx) {
- pktName := filepath.Base(job.Fd.Name())
+ pktName := filepath.Base(job.Path)
les := LEs{{"Node", job.PktEnc.Sender}, {"Pkt", pktName}}
if job.PktEnc.Nice > nice {
ctx.LogD("rx", append(les, LE{"Nice", int(job.PktEnc.Nice)}), "too nice")
- job.Fd.Close() // #nosec G104
continue
}
+ fd, err := os.Open(job.Path)
+ if err != nil {
+ ctx.LogE("rx", les, err, "open")
+ isBad = true
+ continue
+ }
+
pipeR, pipeW := io.Pipe()
go func(job Job) error {
pipeWB := bufio.NewWriter(pipeW)
- _, _, err := PktEncRead(
- ctx.Self,
- ctx.Neigh,
- bufio.NewReader(job.Fd),
- pipeWB,
- )
- job.Fd.Close() // #nosec G104
+ _, _, err := PktEncRead(ctx.Self, ctx.Neigh, bufio.NewReader(fd), pipeWB)
+ fd.Close() // #nosec G104
if err != nil {
return pipeW.CloseWithError(err)
}
return pipeW.Close()
}(job)
var pkt Pkt
- var err error
var pktSize int64
var pktSizeBlocks int64
if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
ctx.LogI("rx", les, "")
if !dryRun {
if doSeen {
- if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
+ if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
fd.Close() // #nosec G104
}
}
- if err = os.Remove(job.Fd.Name()); err != nil {
+ if err = os.Remove(job.Path); err != nil {
ctx.LogE("rx", les, err, "remove")
isBad = true
+ } else if ctx.HdrUsage {
+ os.Remove(job.Path + HdrSuffix)
}
}
case PktTypeFile:
ctx.LogI("rx", les, "")
if !dryRun {
if doSeen {
- if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
+ if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
fd.Close() // #nosec G104
}
}
- if err = os.Remove(job.Fd.Name()); err != nil {
+ if err = os.Remove(job.Path); err != nil {
ctx.LogE("rx", les, err, "remove")
isBad = true
+ } else if ctx.HdrUsage {
+ os.Remove(job.Path + HdrSuffix)
}
if len(sendmail) > 0 && ctx.NotifyFile != nil {
cmd := exec.Command(
ctx.LogI("rx", les, "")
if !dryRun {
if doSeen {
- if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
+ if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
fd.Close() // #nosec G104
}
}
- if err = os.Remove(job.Fd.Name()); err != nil {
+ if err = os.Remove(job.Path); err != nil {
ctx.LogE("rx", les, err, "remove")
isBad = true
+ } else if ctx.HdrUsage {
+ os.Remove(job.Path + HdrSuffix)
}
if len(sendmail) > 0 && ctx.NotifyFreq != nil {
cmd := exec.Command(
ctx.LogI("rx", les, "")
if !dryRun {
if doSeen {
- if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
+ if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
fd.Close() // #nosec G104
}
}
- if err = os.Remove(job.Fd.Name()); err != nil {
+ if err = os.Remove(job.Path); err != nil {
ctx.LogE("rx", les, err, "remove")
isBad = true
+ } else if ctx.HdrUsage {
+ os.Remove(job.Path + HdrSuffix)
}
}
default:
}
for job := range ctx.Jobs(ctx.Self.Id, TTx) {
var buf bytes.Buffer
- _, _, err := PktEncRead(ctx.Self, ctx.Neigh, job.Fd, &buf)
+ fd, err := os.Open(job.Path)
+ if err != nil {
+ t.Error(err)
+ return false
+ }
+ _, _, err = PktEncRead(ctx.Self, ctx.Neigh, fd, &buf)
if err != nil {
t.Error(err)
return false
}
copy(pktTrans.Path[:], nodeOur.Id[:])
var dst bytes.Buffer
- if err := PktEncWrite(
+ if _, err := PktEncWrite(
ctx.Self,
ctx.Neigh[*nodeOur.Id],
&pktTrans,
}
expectedSize := size
for i := 0; i < len(hops); i++ {
- expectedSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+expectedSize)
+ expectedSize = PktEncOverhead +
+ PktSizeOverhead +
+ sizeWithTags(PktOverhead+expectedSize)
}
padSize := minSize - expectedSize
if padSize < 0 {
errs := make(chan error)
curSize := size
pipeR, pipeW := io.Pipe()
+ var pktEncRaw []byte
go func(size int64, src io.Reader, dst io.WriteCloser) {
ctx.LogD("tx", LEs{
{"Node", hops[0].Id},
{"Nice", int(nice)},
{"Size", size},
}, "wrote")
- errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
+ pktEncRaw, err = PktEncWrite(
+ ctx.Self, hops[0], pkt, nice, size, padSize, src, dst,
+ )
+ errs <- err
dst.Close() // #nosec G104
}(curSize, src, pipeW)
- curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + padSize
+ curSize = PktEncOverhead +
+ PktSizeOverhead +
+ sizeWithTags(PktOverhead+curSize) +
+ padSize
var pipeRPrev io.Reader
for i := 1; i < len(hops); i++ {
{"Nice", int(nice)},
{"Size", size},
}, "trns wrote")
- errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
+ _, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
+ errs <- err
dst.Close() // #nosec G104
}(hops[i], pktTrns, curSize, pipeRPrev, pipeW)
curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize)
nodePath := filepath.Join(ctx.Spool, lastNode.Id.String())
err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) // #nosec G104
+ if err != nil {
+ return lastNode, err
+ }
+ if ctx.HdrUsage {
+ ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum()))
+ }
return lastNode, err
}
return false
}
txJob := sentJobs[0]
- defer txJob.Fd.Close()
+ fd, err := os.Open(txJob.Path)
+ if err != nil {
+ panic(err)
+ }
+ defer fd.Close()
var bufR bytes.Buffer
- if _, err = io.Copy(&bufR, txJob.Fd); err != nil {
+ if _, err = io.Copy(&bufR, fd); err != nil {
panic(err)
}
var bufW bytes.Buffer