From b47dbfe6687569650fa544a4ecf3e4ea388390cb Mon Sep 17 00:00:00 2001 From: Sergey Matveev Date: Sun, 7 Nov 2021 20:16:03 +0300 Subject: [PATCH] Streamed NNCPE format --- doc/cmd/nncp-exec.texi | 5 - doc/cmd/nncp-file.texi | 22 +- doc/news.ru.texi | 30 +++ doc/news.texi | 29 +++ src/cmd/nncp-bundle/main.go | 2 +- src/cmd/nncp-call/main.go | 2 +- src/cmd/nncp-caller/main.go | 2 +- src/cmd/nncp-cfgdir/main.go | 2 +- src/cmd/nncp-cfgenc/main.go | 2 +- src/cmd/nncp-cfgmin/main.go | 2 +- src/cmd/nncp-cfgnew/main.go | 2 +- src/cmd/nncp-check/main.go | 2 +- src/cmd/nncp-cronexpr/main.go | 2 +- src/cmd/nncp-daemon/main.go | 2 +- src/cmd/nncp-exec/main.go | 11 +- src/cmd/nncp-file/main.go | 26 +- src/cmd/nncp-freq/main.go | 2 +- src/cmd/nncp-hash/main.go | 2 +- src/cmd/nncp-log/main.go | 2 +- src/cmd/nncp-pkt/main.go | 4 +- src/cmd/nncp-reass/main.go | 2 +- src/cmd/nncp-rm/main.go | 2 +- src/cmd/nncp-stat/main.go | 2 +- src/cmd/nncp-toss/main.go | 2 +- src/cmd/nncp-trns/main.go | 12 +- src/cmd/nncp-xfer/main.go | 2 +- src/go.mod | 2 +- src/jobs.go | 2 + src/magic.go | 6 +- src/nncp.go | 2 +- src/pkt.go | 471 ++++++++++++++++++++++------------ src/pkt_test.go | 64 +++-- src/progress.go | 16 +- src/toss.go | 14 +- src/toss_test.go | 22 +- src/tx.go | 429 ++++++++++++------------------- src/tx_test.go | 24 +- 37 files changed, 696 insertions(+), 531 deletions(-) diff --git a/doc/cmd/nncp-exec.texi b/doc/cmd/nncp-exec.texi index 7eb5e8a..eec6a7e 100644 --- a/doc/cmd/nncp-exec.texi +++ b/doc/cmd/nncp-exec.texi @@ -12,11 +12,6 @@ Body is read from @code{stdin} into memory and compressed (unless execute specified @ref{CfgExec, handle} command with @option{ARG*} appended and decompressed body fed to command's @code{stdin}. -If @option{-use-tmp} option is specified, then @code{stdin} data is read -into temporary file first, requiring twice more disk space, but no -memory requirements. @ref{StdinTmpFile, Same temporary file} rules -applies as with @ref{nncp-file, nncp-file -} command. - For example, if remote side has following configuration file for your node: diff --git a/doc/cmd/nncp-file.texi b/doc/cmd/nncp-file.texi index 24dafde..0a80a4c 100644 --- a/doc/cmd/nncp-file.texi +++ b/doc/cmd/nncp-file.texi @@ -11,23 +11,11 @@ destination file name in remote's @ref{CfgIncoming, incoming} directory. If this file already exists there, then counter will be appended to it. -This command queues file in @ref{Spool, spool} directory immediately -(through the temporary file of course) -- so pay attention that sending -2 GiB file will create 2 GiB outbound encrypted packet. - -@anchor{StdinTmpFile} -If @file{SRC} equals to @file{-}, then create an encrypted temporary -file and copy everything taken from @code{stdin} to it and use for outbound -packet creation. Pay attention that if you want to send 1 GiB of data -taken from @code{stdin}, then you have to have more than 2 GiB of disk space -for that temporary file and resulting encrypted packet. You can control -temporary file location directory with @env{$TMPDIR} environment -variable. Encryption is performed in AEAD mode with -@url{https://cr.yp.to/chacha.html, ChaCha20}-@url{https://en.wikipedia.org/wiki/Poly1305, Poly1305} -algorithms. Data is divided on 128 KiB blocks. Each block is encrypted -with increasing nonce counter. File is deletes immediately after -creation, so even if program crashes -- disk space will be reclaimed, no -need in cleaning it up later. +This command queues file in @ref{Spool, spool} directory immediately -- +so pay attention that sending 2 GiB file will create 2 GiB outbound +encrypted packet. + +If @file{SRC} equals to @file{-}, to data is read from @code{stdin}. If @file{SRC} points to directory, then @url{https://pubs.opengroup.org/onlinepubs/9699919799/utilities/pax.html#tag_20_92_13_01, pax archive} diff --git a/doc/news.ru.texi b/doc/news.ru.texi index 210a440..697d0a1 100644 --- a/doc/news.ru.texi +++ b/doc/news.ru.texi @@ -1,6 +1,36 @@ @node Новости @section Новости +@node Релиз 8.0.0 +@subsection Релиз 8.0.0 +@itemize + +@item +@strong{Несовместимое} изменение формата зашифрованных пакетов: размеры +полезной нагрузки и дополнения посылаются прямо внутри зашифрованного +потока. Это даёт возможность потоково создавать шифрованные пакеты, без +знания размеров заранее, без создания временного файла или буферизации в +памяти. + +@item +Производится корректная проверка дополнения зашифрованного пакета. Это +не критичная проблема, но прежде ни размер, ни значение дополнения не +были аутентифицированы, позволяя циклично откусывать по байту с конца и +узнавать размер полезной нагрузки, наблюдая за реакцией по обработке +такого зашифрованного пакета. + +@item +@command{nncp-exec} больше не имеет @option{-use-tmp} опции, из-за +совместимого с потоковой работой формата зашифрованных пакетов. + +@item +У @command{nncp-file} и @command{nncp-exec} команд появилась опция +@option{-maxsize}, ограничивающая максимальный результирующий размер +зашифрованного пакета (возвращая ошибку если он превышен). Может быть +полезно, так как размер полезной нагрузки может быть неизвестен заранее. + +@end itemize + @node Релиз 7.7.0 @subsection Релиз 7.7.0 @itemize diff --git a/doc/news.texi b/doc/news.texi index ad825f1..1cf64c5 100644 --- a/doc/news.texi +++ b/doc/news.texi @@ -3,6 +3,35 @@ See also this page @ref{Новости, on russian}. +@node Release 8_0_0 +@section Release 8.0.0 +@itemize + +@item +@strong{Incompatible} encrypted packet format change: payload and pad +sizes are sent in-bound in the encrypted stream. That gives ability to +streamingly create encrypted packets, without knowing sizes in advance, +without creating temporary file or buffer data in memory. + +@item +Proper encrypted packet padding verification is done now. This is not +critical issue, but previously neither padding value, nor its size were +authenticated, giving ability to iteratively strip trailing bytes and +determine payload's size by observing the reaction of the encrypted +packet processing. + +@item +@command{nncp-exec} loses its @option{-use-tmp} option, because of +streaming-compatible encrypted packets format. + +@item +@command{nncp-file} and @command{nncp-exec} commands have +@option{-maxsize} option, limiting maximal resulting encrypted packet's +maximal size (returning error if it is exceeded). Could be useful, +because no payload size could be known in advance. + +@end itemize + @node Release 7_7_0 @section Release 7.7.0 @itemize diff --git a/src/cmd/nncp-bundle/main.go b/src/cmd/nncp-bundle/main.go index 52ce4f6..def4f86 100644 --- a/src/cmd/nncp-bundle/main.go +++ b/src/cmd/nncp-bundle/main.go @@ -34,7 +34,7 @@ import ( xdr "github.com/davecgh/go-xdr/xdr2" "github.com/dustin/go-humanize" - "go.cypherpunks.ru/nncp/v7" + "go.cypherpunks.ru/nncp/v8" ) const ( diff --git a/src/cmd/nncp-call/main.go b/src/cmd/nncp-call/main.go index 03a5a1a..0ac016b 100644 --- a/src/cmd/nncp-call/main.go +++ b/src/cmd/nncp-call/main.go @@ -26,7 +26,7 @@ import ( "strings" "time" - "go.cypherpunks.ru/nncp/v7" + "go.cypherpunks.ru/nncp/v8" ) func usage() { diff --git a/src/cmd/nncp-caller/main.go b/src/cmd/nncp-caller/main.go index 4024b5b..0e5de53 100644 --- a/src/cmd/nncp-caller/main.go +++ b/src/cmd/nncp-caller/main.go @@ -27,7 +27,7 @@ import ( "sync" "time" - "go.cypherpunks.ru/nncp/v7" + "go.cypherpunks.ru/nncp/v8" ) func usage() { diff --git a/src/cmd/nncp-cfgdir/main.go b/src/cmd/nncp-cfgdir/main.go index cbc8f8f..7e224e2 100644 --- a/src/cmd/nncp-cfgdir/main.go +++ b/src/cmd/nncp-cfgdir/main.go @@ -26,7 +26,7 @@ import ( "os" "github.com/hjson/hjson-go" - "go.cypherpunks.ru/nncp/v7" + "go.cypherpunks.ru/nncp/v8" ) func usage() { diff --git a/src/cmd/nncp-cfgenc/main.go b/src/cmd/nncp-cfgenc/main.go index 1516d23..465963d 100644 --- a/src/cmd/nncp-cfgenc/main.go +++ b/src/cmd/nncp-cfgenc/main.go @@ -28,7 +28,7 @@ import ( "os" xdr "github.com/davecgh/go-xdr/xdr2" - "go.cypherpunks.ru/nncp/v7" + "go.cypherpunks.ru/nncp/v8" "golang.org/x/crypto/blake2b" "golang.org/x/term" ) diff --git a/src/cmd/nncp-cfgmin/main.go b/src/cmd/nncp-cfgmin/main.go index 4185302..f0ddb94 100644 --- a/src/cmd/nncp-cfgmin/main.go +++ b/src/cmd/nncp-cfgmin/main.go @@ -25,7 +25,7 @@ import ( "os" "github.com/hjson/hjson-go" - "go.cypherpunks.ru/nncp/v7" + "go.cypherpunks.ru/nncp/v8" ) func usage() { diff --git a/src/cmd/nncp-cfgnew/main.go b/src/cmd/nncp-cfgnew/main.go index 56e4e4f..0ec00e9 100644 --- a/src/cmd/nncp-cfgnew/main.go +++ b/src/cmd/nncp-cfgnew/main.go @@ -30,7 +30,7 @@ import ( "golang.org/x/crypto/blake2b" "golang.org/x/crypto/nacl/box" - "go.cypherpunks.ru/nncp/v7" + "go.cypherpunks.ru/nncp/v8" ) func usage() { diff --git a/src/cmd/nncp-check/main.go b/src/cmd/nncp-check/main.go index b2a931c..721a54f 100644 --- a/src/cmd/nncp-check/main.go +++ b/src/cmd/nncp-check/main.go @@ -26,7 +26,7 @@ import ( "path/filepath" "time" - "go.cypherpunks.ru/nncp/v7" + "go.cypherpunks.ru/nncp/v8" ) func usage() { diff --git a/src/cmd/nncp-cronexpr/main.go b/src/cmd/nncp-cronexpr/main.go index a74fb0e..657cd14 100644 --- a/src/cmd/nncp-cronexpr/main.go +++ b/src/cmd/nncp-cronexpr/main.go @@ -27,7 +27,7 @@ import ( "time" "github.com/gorhill/cronexpr" - "go.cypherpunks.ru/nncp/v7" + "go.cypherpunks.ru/nncp/v8" ) func usage() { diff --git a/src/cmd/nncp-daemon/main.go b/src/cmd/nncp-daemon/main.go index 4e9231d..2503ab2 100644 --- a/src/cmd/nncp-daemon/main.go +++ b/src/cmd/nncp-daemon/main.go @@ -29,7 +29,7 @@ import ( "time" "github.com/dustin/go-humanize" - "go.cypherpunks.ru/nncp/v7" + "go.cypherpunks.ru/nncp/v8" "golang.org/x/net/netutil" ) diff --git a/src/cmd/nncp-exec/main.go b/src/cmd/nncp-exec/main.go index 11a9907..7747f3f 100644 --- a/src/cmd/nncp-exec/main.go +++ b/src/cmd/nncp-exec/main.go @@ -26,7 +26,7 @@ import ( "os" "strings" - "go.cypherpunks.ru/nncp/v7" + "go.cypherpunks.ru/nncp/v8" ) func usage() { @@ -40,12 +40,12 @@ func usage() { func main() { var ( - useTmp = flag.Bool("use-tmp", false, "Use temporary file, instead of memory buffer") noCompress = flag.Bool("nocompress", false, "Do not compress input data") cfgPath = flag.String("cfg", nncp.DefaultCfgPath, "Path to configuration file") niceRaw = flag.String("nice", nncp.NicenessFmt(nncp.DefaultNiceExec), "Outbound packet niceness") replyNiceRaw = flag.String("replynice", nncp.NicenessFmt(nncp.DefaultNiceFile), "Possible reply packet niceness") minSize = flag.Uint64("minsize", 0, "Minimal required resulting packet size, in KiB") + argMaxSize = flag.Uint64("maxsize", 0, "Maximal allowable resulting packet size, in KiB") viaOverride = flag.String("via", "", "Override Via path to destination node") spoolPath = flag.String("spool", "", "Override path to spool") logPath = flag.String("log", "", "Override path to logfile") @@ -111,6 +111,11 @@ func main() { } } + maxSize := int64(nncp.MaxFileSize) + if *argMaxSize > 0 { + maxSize = int64(*argMaxSize) * 1024 + } + nncp.ViaOverride(*viaOverride, ctx, node) ctx.Umask() @@ -122,7 +127,7 @@ func main() { flag.Args()[2:], bufio.NewReader(os.Stdin), int64(*minSize)*1024, - *useTmp, + maxSize, *noCompress, areaId, ); err != nil { diff --git a/src/cmd/nncp-file/main.go b/src/cmd/nncp-file/main.go index 6870aec..1d72a9d 100644 --- a/src/cmd/nncp-file/main.go +++ b/src/cmd/nncp-file/main.go @@ -25,7 +25,7 @@ import ( "os" "strings" - "go.cypherpunks.ru/nncp/v7" + "go.cypherpunks.ru/nncp/v8" ) func usage() { @@ -36,7 +36,8 @@ func usage() { os.Args[0], nncp.AreaDir) flag.PrintDefaults() fmt.Fprint(os.Stderr, ` -If SRC equals to -, then read data from stdin to temporary file. +If SRC equals to "-", then data is read from stdin. +If SRC is directory, then create pax archive with its contents. -minsize/-chunked take NODE's freq.minsize/freq.chunked configuration options by default. You can forcefully turn them off by specifying 0 value. @@ -48,6 +49,7 @@ func main() { cfgPath = flag.String("cfg", nncp.DefaultCfgPath, "Path to configuration file") niceRaw = flag.String("nice", nncp.NicenessFmt(nncp.DefaultNiceFile), "Outbound packet niceness") argMinSize = flag.Int64("minsize", -1, "Minimal required resulting packet size, in KiB") + argMaxSize = flag.Uint64("maxsize", 0, "Maximal allowable resulting packets size, in KiB") argChunkSize = flag.Int64("chunked", -1, "Split file on specified size chunks, in KiB") viaOverride = flag.String("via", "", "Override Via path to destination node") spoolPath = flag.String("spool", "", "Override path to spool") @@ -124,6 +126,13 @@ func main() { nncp.ViaOverride(*viaOverride, ctx, node) ctx.Umask() + var chunkSize int64 + if *argChunkSize < 0 { + chunkSize = node.FreqChunked + } else if *argChunkSize > 0 { + chunkSize = *argChunkSize * 1024 + } + var minSize int64 if *argMinSize < 0 { minSize = node.FreqMinSize @@ -131,14 +140,9 @@ func main() { minSize = *argMinSize * 1024 } - var chunkSize int64 - if *argChunkSize < 0 { - chunkSize = node.FreqChunked - } else if *argChunkSize > 0 { - chunkSize = *argChunkSize * 1024 - } - if chunkSize == 0 { - chunkSize = nncp.MaxFileSize + maxSize := int64(nncp.MaxFileSize) + if *argMaxSize > 0 { + maxSize = int64(*argMaxSize) * 1024 } if err = ctx.TxFile( @@ -148,7 +152,7 @@ func main() { strings.Join(splitted, ":"), chunkSize, minSize, - nncp.MaxFileSize, + maxSize, areaId, ); err != nil { log.Fatalln(err) diff --git a/src/cmd/nncp-freq/main.go b/src/cmd/nncp-freq/main.go index 9596e80..78b565a 100644 --- a/src/cmd/nncp-freq/main.go +++ b/src/cmd/nncp-freq/main.go @@ -26,7 +26,7 @@ import ( "path/filepath" "strings" - "go.cypherpunks.ru/nncp/v7" + "go.cypherpunks.ru/nncp/v8" ) func usage() { diff --git a/src/cmd/nncp-hash/main.go b/src/cmd/nncp-hash/main.go index bcdbca5..a350134 100644 --- a/src/cmd/nncp-hash/main.go +++ b/src/cmd/nncp-hash/main.go @@ -28,7 +28,7 @@ import ( "os" "sync" - "go.cypherpunks.ru/nncp/v7" + "go.cypherpunks.ru/nncp/v8" ) func usage() { diff --git a/src/cmd/nncp-log/main.go b/src/cmd/nncp-log/main.go index 82becc6..c6bd120 100644 --- a/src/cmd/nncp-log/main.go +++ b/src/cmd/nncp-log/main.go @@ -25,7 +25,7 @@ import ( "log" "os" - "go.cypherpunks.ru/nncp/v7" + "go.cypherpunks.ru/nncp/v8" "go.cypherpunks.ru/recfile" ) diff --git a/src/cmd/nncp-pkt/main.go b/src/cmd/nncp-pkt/main.go index 58a4599..c22b1f7 100644 --- a/src/cmd/nncp-pkt/main.go +++ b/src/cmd/nncp-pkt/main.go @@ -29,7 +29,7 @@ import ( xdr "github.com/davecgh/go-xdr/xdr2" "github.com/klauspost/compress/zstd" - "go.cypherpunks.ru/nncp/v7" + "go.cypherpunks.ru/nncp/v8" ) func usage() { @@ -219,6 +219,8 @@ func main() { case nncp.MagicNNCPEv4.B: log.Fatalln(nncp.MagicNNCPEv4.TooOld()) case nncp.MagicNNCPEv5.B: + log.Fatalln(nncp.MagicNNCPEv5.TooOld()) + case nncp.MagicNNCPEv6.B: doEncrypted(ctx, pktEnc, *dump, beginning[:nncp.PktEncOverhead]) return } diff --git a/src/cmd/nncp-reass/main.go b/src/cmd/nncp-reass/main.go index a1ee4a3..6a237f4 100644 --- a/src/cmd/nncp-reass/main.go +++ b/src/cmd/nncp-reass/main.go @@ -35,7 +35,7 @@ import ( xdr "github.com/davecgh/go-xdr/xdr2" "github.com/dustin/go-humanize" - "go.cypherpunks.ru/nncp/v7" + "go.cypherpunks.ru/nncp/v8" ) func usage() { diff --git a/src/cmd/nncp-rm/main.go b/src/cmd/nncp-rm/main.go index 25110f8..501f5fc 100644 --- a/src/cmd/nncp-rm/main.go +++ b/src/cmd/nncp-rm/main.go @@ -29,7 +29,7 @@ import ( "strings" "time" - "go.cypherpunks.ru/nncp/v7" + "go.cypherpunks.ru/nncp/v8" ) func usage() { diff --git a/src/cmd/nncp-stat/main.go b/src/cmd/nncp-stat/main.go index ea5f407..469bbed 100644 --- a/src/cmd/nncp-stat/main.go +++ b/src/cmd/nncp-stat/main.go @@ -26,7 +26,7 @@ import ( "sort" "github.com/dustin/go-humanize" - "go.cypherpunks.ru/nncp/v7" + "go.cypherpunks.ru/nncp/v8" ) func usage() { diff --git a/src/cmd/nncp-toss/main.go b/src/cmd/nncp-toss/main.go index 8a3b8c8..c4b0121 100644 --- a/src/cmd/nncp-toss/main.go +++ b/src/cmd/nncp-toss/main.go @@ -26,7 +26,7 @@ import ( "path/filepath" "time" - "go.cypherpunks.ru/nncp/v7" + "go.cypherpunks.ru/nncp/v8" ) func usage() { diff --git a/src/cmd/nncp-trns/main.go b/src/cmd/nncp-trns/main.go index 6c6e3d4..ed96f73 100644 --- a/src/cmd/nncp-trns/main.go +++ b/src/cmd/nncp-trns/main.go @@ -27,7 +27,7 @@ import ( "path/filepath" "strings" - "go.cypherpunks.ru/nncp/v7" + "go.cypherpunks.ru/nncp/v8" ) func usage() { @@ -137,7 +137,15 @@ func main() { if err != nil { panic(err) } - if _, err = ctx.Tx(node, pktTrns, nice, fi.Size(), 0, fd, pktName, nil); err != nil { + if _, _, err = ctx.Tx( + node, + pktTrns, + nice, + fi.Size(), 0, nncp.MaxFileSize, + fd, + pktName, + nil, + ); err != nil { log.Fatalln(err) } } diff --git a/src/cmd/nncp-xfer/main.go b/src/cmd/nncp-xfer/main.go index e565ece..b65686c 100644 --- a/src/cmd/nncp-xfer/main.go +++ b/src/cmd/nncp-xfer/main.go @@ -29,7 +29,7 @@ import ( "path/filepath" "github.com/dustin/go-humanize" - "go.cypherpunks.ru/nncp/v7" + "go.cypherpunks.ru/nncp/v8" ) func usage() { diff --git a/src/go.mod b/src/go.mod index 80c8417..73fa5aa 100644 --- a/src/go.mod +++ b/src/go.mod @@ -1,4 +1,4 @@ -module go.cypherpunks.ru/nncp/v7 +module go.cypherpunks.ru/nncp/v8 require ( github.com/davecgh/go-xdr v0.0.0-20161123171359-e6a2ba005892 diff --git a/src/jobs.go b/src/jobs.go index 7e920fd..ae18b74 100644 --- a/src/jobs.go +++ b/src/jobs.go @@ -181,6 +181,8 @@ func (ctx *Ctx) jobsFind(nodeId *NodeId, xx TRxTx, nock, part bool) chan Job { case MagicNNCPEv4.B: err = MagicNNCPEv4.TooOld() case MagicNNCPEv5.B: + err = MagicNNCPEv5.TooOld() + case MagicNNCPEv6.B: default: err = BadMagic } diff --git a/src/magic.go b/src/magic.go index 938d333..f0fdb58 100644 --- a/src/magic.go +++ b/src/magic.go @@ -67,7 +67,11 @@ var ( } MagicNNCPEv5 = Magic{ B: [8]byte{'N', 'N', 'C', 'P', 'E', 0, 0, 5}, - Name: "NNCPEv5 (encrypted packet v5)", Till: "now", + Name: "NNCPEv5 (encrypted packet v5)", Till: "7.7.0", + } + MagicNNCPEv6 = Magic{ + B: [8]byte{'N', 'N', 'C', 'P', 'E', 0, 0, 6}, + Name: "NNCPEv6 (encrypted packet v6)", Till: "now", } MagicNNCPSv1 = Magic{ B: [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}, diff --git a/src/nncp.go b/src/nncp.go index e6ba597..3bf6d1a 100644 --- a/src/nncp.go +++ b/src/nncp.go @@ -40,7 +40,7 @@ along with this program. If not, see .` const Base32Encoded32Len = 52 var ( - Version string = "7.7.0" + Version string = "8.0.0" Base32Codec *base32.Encoding = base32.StdEncoding.WithPadding(base32.NoPadding) ) diff --git a/src/pkt.go b/src/pkt.go index aa3025d..a1993e7 100644 --- a/src/pkt.go +++ b/src/pkt.go @@ -21,7 +21,6 @@ import ( "bytes" "crypto/cipher" "crypto/rand" - "encoding/binary" "errors" "io" @@ -49,15 +48,20 @@ const ( MaxPathSize = 1<<8 - 1 NNCPBundlePrefix = "NNCP" - - PktSizeOverhead = 8 + poly1305.TagSize ) var ( BadPktType error = errors.New("Unknown packet type") - PktOverhead int64 - PktEncOverhead int64 + DeriveKeyFullCtx = string(MagicNNCPEv6.B[:]) + " FULL" + DeriveKeySizeCtx = string(MagicNNCPEv6.B[:]) + " SIZE" + DeriveKeyPadCtx = string(MagicNNCPEv6.B[:]) + " PAD" + + PktOverhead int64 + PktEncOverhead int64 + PktSizeOverhead int64 + + TooBig = errors.New("Too big than allowed") ) type Pkt struct { @@ -85,9 +89,28 @@ type PktEnc struct { Sign [ed25519.SignatureSize]byte } +type PktSize struct { + Payload uint64 + Pad uint64 +} + +func NewPkt(typ PktType, nice uint8, path []byte) (*Pkt, error) { + if len(path) > MaxPathSize { + return nil, errors.New("Too long path") + } + pkt := Pkt{ + Magic: MagicNNCPPv3.B, + Type: typ, + Nice: nice, + PathLen: uint8(len(path)), + } + copy(pkt.Path[:], path) + return &pkt, nil +} + func init() { - pkt := Pkt{Type: PktTypeFile} var buf bytes.Buffer + pkt := Pkt{Type: PktTypeFile} n, err := xdr.Marshal(&buf, pkt) if err != nil { panic(err) @@ -100,7 +123,7 @@ func init() { panic(err) } pktEnc := PktEnc{ - Magic: MagicNNCPEv5.B, + Magic: MagicNNCPEv6.B, Sender: dummyId, Recipient: dummyId, } @@ -109,20 +132,14 @@ func init() { panic(err) } PktEncOverhead = int64(n) -} + buf.Reset() -func NewPkt(typ PktType, nice uint8, path []byte) (*Pkt, error) { - if len(path) > MaxPathSize { - return nil, errors.New("Too long path") - } - pkt := Pkt{ - Magic: MagicNNCPPv3.B, - Type: typ, - Nice: nice, - PathLen: uint8(len(path)), + size := PktSize{} + n, err = xdr.Marshal(&buf, size) + if err != nil { + panic(err) } - copy(pkt.Path[:], path) - return &pkt, nil + PktSizeOverhead = int64(n) } func ctrIncr(b []byte) { @@ -135,53 +152,28 @@ func ctrIncr(b []byte) { panic("counter overflow") } -func aeadProcess( - aead cipher.AEAD, - nonce, ad []byte, - doEncrypt bool, - r io.Reader, - w io.Writer, -) (int64, error) { - ciphCtr := nonce[len(nonce)-8:] - buf := make([]byte, EncBlkSize+aead.Overhead()) - var toRead []byte - var toWrite []byte - var n int - var readBytes int64 - var err error - if doEncrypt { - toRead = buf[:EncBlkSize] - } else { - toRead = buf +func TbsPrepare(our *NodeOur, their *Node, pktEnc *PktEnc) []byte { + tbs := PktTbs{ + Magic: MagicNNCPEv6.B, + Nice: pktEnc.Nice, + Sender: their.Id, + Recipient: our.Id, + ExchPub: pktEnc.ExchPub, } - for { - n, err = io.ReadFull(r, toRead) - if err != nil { - if err == io.EOF { - break - } - if err != io.ErrUnexpectedEOF { - return readBytes + int64(n), err - } - } - readBytes += int64(n) - ctrIncr(ciphCtr) - if doEncrypt { - toWrite = aead.Seal(buf[:0], nonce, buf[:n], ad) - } else { - toWrite, err = aead.Open(buf[:0], nonce, buf[:n], ad) - if err != nil { - return readBytes, err - } - } - if _, err = w.Write(toWrite); err != nil { - return readBytes, err - } + var tbsBuf bytes.Buffer + if _, err := xdr.Marshal(&tbsBuf, &tbs); err != nil { + panic(err) } - return readBytes, nil + return tbsBuf.Bytes() +} + +func TbsVerify(our *NodeOur, their *Node, pktEnc *PktEnc) ([]byte, bool, error) { + tbs := TbsPrepare(our, their, pktEnc) + return tbs, ed25519.Verify(their.SignPub, tbs, pktEnc.Sign[:]), nil } func sizeWithTags(size int64) (fullSize int64) { + size += PktSizeOverhead fullSize = size + (size/EncBlkSize)*poly1305.TagSize if size%EncBlkSize != 0 { fullSize += poly1305.TagSize @@ -189,122 +181,182 @@ func sizeWithTags(size int64) (fullSize int64) { return } +func sizePadCalc(sizePayload, minSize int64, wrappers int) (sizePad int64) { + expectedSize := sizePayload - PktOverhead + for i := 0; i < wrappers; i++ { + expectedSize = PktEncOverhead + sizeWithTags(PktOverhead+expectedSize) + } + sizePad = minSize - expectedSize + if sizePad < 0 { + sizePad = 0 + } + return +} + func PktEncWrite( - our *NodeOur, - their *Node, - pkt *Pkt, - nice uint8, - size, padSize int64, - data io.Reader, - out io.Writer, -) ([]byte, error) { - pubEph, prvEph, err := box.GenerateKey(rand.Reader) + our *NodeOur, their *Node, + pkt *Pkt, nice uint8, + minSize, maxSize int64, wrappers int, + r io.Reader, w io.Writer, +) (pktEncRaw []byte, size int64, err error) { + pub, prv, err := box.GenerateKey(rand.Reader) if err != nil { - return nil, err + return nil, 0, err } - var pktBuf bytes.Buffer - if _, err := xdr.Marshal(&pktBuf, pkt); err != nil { - return nil, err + + var buf bytes.Buffer + _, err = xdr.Marshal(&buf, pkt) + if err != nil { + return } + pktRaw := make([]byte, buf.Len()) + copy(pktRaw, buf.Bytes()) + buf.Reset() + tbs := PktTbs{ - Magic: MagicNNCPEv5.B, + Magic: MagicNNCPEv6.B, Nice: nice, Sender: our.Id, Recipient: their.Id, - ExchPub: *pubEph, + ExchPub: *pub, } - var tbsBuf bytes.Buffer - if _, err = xdr.Marshal(&tbsBuf, &tbs); err != nil { - return nil, err + _, err = xdr.Marshal(&buf, &tbs) + if err != nil { + return } signature := new([ed25519.SignatureSize]byte) - copy(signature[:], ed25519.Sign(our.SignPrv, tbsBuf.Bytes())) + copy(signature[:], ed25519.Sign(our.SignPrv, buf.Bytes())) + ad := blake3.Sum256(buf.Bytes()) + buf.Reset() + pktEnc := PktEnc{ - Magic: MagicNNCPEv5.B, + Magic: MagicNNCPEv6.B, Nice: nice, Sender: our.Id, Recipient: their.Id, - ExchPub: *pubEph, + ExchPub: *pub, Sign: *signature, } - ad := blake3.Sum256(tbsBuf.Bytes()) - tbsBuf.Reset() - if _, err = xdr.Marshal(&tbsBuf, &pktEnc); err != nil { - return nil, err + _, err = xdr.Marshal(&buf, &pktEnc) + if err != nil { + return } - pktEncRaw := tbsBuf.Bytes() - if _, err = out.Write(pktEncRaw); err != nil { - return nil, err + pktEncRaw = make([]byte, buf.Len()) + copy(pktEncRaw, buf.Bytes()) + buf.Reset() + _, err = w.Write(pktEncRaw) + if err != nil { + return } - sharedKey := new([32]byte) - curve25519.ScalarMult(sharedKey, prvEph, their.ExchPub) - key := make([]byte, chacha20poly1305.KeySize) - blake3.DeriveKey(key, string(MagicNNCPEv5.B[:]), sharedKey[:]) - aead, err := chacha20poly1305.New(key) + sharedKey := new([32]byte) + curve25519.ScalarMult(sharedKey, prv, their.ExchPub) + keyFull := make([]byte, chacha20poly1305.KeySize) + keySize := make([]byte, chacha20poly1305.KeySize) + blake3.DeriveKey(keyFull, DeriveKeyFullCtx, sharedKey[:]) + blake3.DeriveKey(keySize, DeriveKeySizeCtx, sharedKey[:]) + aeadFull, err := chacha20poly1305.New(keyFull) + if err != nil { + return + } + aeadSize, err := chacha20poly1305.New(keySize) if err != nil { - return nil, err + return } - nonce := make([]byte, aead.NonceSize()) + nonce := make([]byte, aeadFull.NonceSize()) - fullSize := int64(pktBuf.Len()) + size - sizeBuf := make([]byte, 8+aead.Overhead()) - binary.BigEndian.PutUint64(sizeBuf, uint64(sizeWithTags(fullSize))) - if _, err = out.Write(aead.Seal(sizeBuf[:0], nonce, sizeBuf[:8], ad[:])); err != nil { - return nil, err + data := make([]byte, EncBlkSize, EncBlkSize+aeadFull.Overhead()) + mr := io.MultiReader(bytes.NewReader(pktRaw), r) + var sizePayload int64 + var n int + var ct []byte + for { + n, err = io.ReadFull(mr, data) + sizePayload += int64(n) + if sizePayload > maxSize { + err = TooBig + return + } + if err == nil { + ct = aeadFull.Seal(data[:0], nonce, data[:n], ad[:]) + _, err = w.Write(ct) + if err != nil { + return + } + ctrIncr(nonce) + continue + } + if !(err == io.EOF || err == io.ErrUnexpectedEOF) { + return + } + break } - lr := io.LimitedReader{R: data, N: size} - mr := io.MultiReader(&pktBuf, &lr) - written, err := aeadProcess(aead, nonce, ad[:], true, mr, out) + sizePad := sizePadCalc(sizePayload, minSize, wrappers) + _, err = xdr.Marshal(&buf, &PktSize{uint64(sizePayload), uint64(sizePad)}) if err != nil { - return nil, err - } - if written != fullSize { - return nil, io.ErrUnexpectedEOF + return } - if padSize > 0 { - blake3.DeriveKey(key, string(MagicNNCPEv5.B[:])+" PAD", sharedKey[:]) - xof := blake3.New(32, key).XOF() - if _, err = io.CopyN(out, xof, padSize); err != nil { - return nil, err + + var aeadLast cipher.AEAD + if n+int(PktSizeOverhead) > EncBlkSize { + left := make([]byte, (n+int(PktSizeOverhead))-EncBlkSize) + copy(left, data[n-len(left):]) + copy(data[PktSizeOverhead:], data[:n-len(left)]) + copy(data[:PktSizeOverhead], buf.Bytes()) + ct = aeadSize.Seal(data[:0], nonce, data[:EncBlkSize], ad[:]) + _, err = w.Write(ct) + if err != nil { + return } + ctrIncr(nonce) + copy(data, left) + n = len(left) + aeadLast = aeadFull + } else { + copy(data[PktSizeOverhead:], data[:n]) + copy(data[:PktSizeOverhead], buf.Bytes()) + n += int(PktSizeOverhead) + aeadLast = aeadSize } - return pktEncRaw, nil -} -func TbsPrepare(our *NodeOur, their *Node, pktEnc *PktEnc) []byte { - tbs := PktTbs{ - Magic: MagicNNCPEv5.B, - Nice: pktEnc.Nice, - Sender: their.Id, - Recipient: our.Id, - ExchPub: pktEnc.ExchPub, + var sizeBlockPadded int + var sizePadLeft int64 + if sizePad > EncBlkSize-int64(n) { + sizeBlockPadded = EncBlkSize + sizePadLeft = sizePad - (EncBlkSize - int64(n)) + } else { + sizeBlockPadded = n + int(sizePad) + sizePadLeft = 0 } - var tbsBuf bytes.Buffer - if _, err := xdr.Marshal(&tbsBuf, &tbs); err != nil { - panic(err) + for i := n; i < sizeBlockPadded; i++ { + data[i] = 0 + } + ct = aeadLast.Seal(data[:0], nonce, data[:sizeBlockPadded], ad[:]) + _, err = w.Write(ct) + if err != nil { + return } - return tbsBuf.Bytes() -} -func TbsVerify(our *NodeOur, their *Node, pktEnc *PktEnc) ([]byte, bool, error) { - tbs := TbsPrepare(our, their, pktEnc) - return tbs, ed25519.Verify(their.SignPub, tbs, pktEnc.Sign[:]), nil + size = sizePayload + if sizePadLeft > 0 { + keyPad := make([]byte, chacha20poly1305.KeySize) + blake3.DeriveKey(keyPad, DeriveKeyPadCtx, sharedKey[:]) + _, err = io.CopyN(w, blake3.New(32, keyPad).XOF(), sizePadLeft) + } + return } func PktEncRead( - our *NodeOur, - nodes map[NodeId]*Node, - data io.Reader, - out io.Writer, + our *NodeOur, nodes map[NodeId]*Node, + r io.Reader, w io.Writer, signatureVerify bool, sharedKeyCached []byte, -) ([]byte, *Node, int64, error) { +) (sharedKey []byte, their *Node, size int64, err error) { var pktEnc PktEnc - _, err := xdr.Unmarshal(data, &pktEnc) + _, err = xdr.Unmarshal(r, &pktEnc) if err != nil { - return nil, nil, 0, err + return } switch pktEnc.Magic { case MagicNNCPEv1.B: @@ -316,66 +368,159 @@ func PktEncRead( case MagicNNCPEv4.B: err = MagicNNCPEv4.TooOld() case MagicNNCPEv5.B: + err = MagicNNCPEv5.TooOld() + case MagicNNCPEv6.B: default: err = BadMagic } if err != nil { - return nil, nil, 0, err + return } if *pktEnc.Recipient != *our.Id { - return nil, nil, 0, errors.New("Invalid recipient") + err = errors.New("Invalid recipient") + return } + var tbsRaw []byte - var their *Node if signatureVerify { their = nodes[*pktEnc.Sender] if their == nil { - return nil, nil, 0, errors.New("Unknown sender") + err = errors.New("Unknown sender") + return } var verified bool tbsRaw, verified, err = TbsVerify(our, their, &pktEnc) if err != nil { - return nil, nil, 0, err + return } if !verified { - return nil, their, 0, errors.New("Invalid signature") + err = errors.New("Invalid signature") + return } } else { tbsRaw = TbsPrepare(our, &Node{Id: pktEnc.Sender}, &pktEnc) } ad := blake3.Sum256(tbsRaw) - sharedKey := new([32]byte) if sharedKeyCached == nil { - curve25519.ScalarMult(sharedKey, our.ExchPrv, &pktEnc.ExchPub) + key := new([32]byte) + curve25519.ScalarMult(key, our.ExchPrv, &pktEnc.ExchPub) + sharedKey = key[:] } else { - copy(sharedKey[:], sharedKeyCached) + sharedKey = sharedKeyCached } - key := make([]byte, chacha20poly1305.KeySize) - blake3.DeriveKey(key, string(MagicNNCPEv5.B[:]), sharedKey[:]) - aead, err := chacha20poly1305.New(key) + keyFull := make([]byte, chacha20poly1305.KeySize) + keySize := make([]byte, chacha20poly1305.KeySize) + blake3.DeriveKey(keyFull, DeriveKeyFullCtx, sharedKey[:]) + blake3.DeriveKey(keySize, DeriveKeySizeCtx, sharedKey[:]) + aeadFull, err := chacha20poly1305.New(keyFull) if err != nil { - return sharedKey[:], their, 0, err + return + } + aeadSize, err := chacha20poly1305.New(keySize) + if err != nil { + return + } + nonce := make([]byte, aeadFull.NonceSize()) + + ct := make([]byte, EncBlkSize+aeadFull.Overhead()) + pt := make([]byte, EncBlkSize) + var n int +FullRead: + for { + n, err = io.ReadFull(r, ct) + switch err { + case nil: + pt, err = aeadFull.Open(pt[:0], nonce, ct, ad[:]) + if err != nil { + break FullRead + } + size += EncBlkSize + _, err = w.Write(pt) + if err != nil { + return + } + ctrIncr(nonce) + continue + case io.ErrUnexpectedEOF: + break FullRead + default: + return + } } - nonce := make([]byte, aead.NonceSize()) - sizeBuf := make([]byte, 8+aead.Overhead()) - if _, err = io.ReadFull(data, sizeBuf); err != nil { - return sharedKey[:], their, 0, err + pt, err = aeadSize.Open(pt[:0], nonce, ct[:n], ad[:]) + if err != nil { + return } - sizeBuf, err = aead.Open(sizeBuf[:0], nonce, sizeBuf, ad[:]) + var pktSize PktSize + _, err = xdr.Unmarshal(bytes.NewReader(pt), &pktSize) if err != nil { - return sharedKey[:], their, 0, err + return } - size := int64(binary.BigEndian.Uint64(sizeBuf)) + pt = pt[PktSizeOverhead:] - lr := io.LimitedReader{R: data, N: size} - written, err := aeadProcess(aead, nonce, ad[:], false, &lr, out) + left := int64(pktSize.Payload) - size + for left > int64(len(pt)) { + size += int64(len(pt)) + left -= int64(len(pt)) + _, err = w.Write(pt) + if err != nil { + return + } + n, err = io.ReadFull(r, ct) + if err != nil && err != io.ErrUnexpectedEOF { + return + } + ctrIncr(nonce) + pt, err = aeadFull.Open(pt[:0], nonce, ct[:n], ad[:]) + if err != nil { + return + } + } + size += left + _, err = w.Write(pt[:left]) if err != nil { - return sharedKey[:], their, written, err + return + } + pt = pt[left:] + + if pktSize.Pad < uint64(len(pt)) { + err = errors.New("unexpected pad") + return } - if written != size { - return sharedKey[:], their, written, io.ErrUnexpectedEOF + for i := 0; i < len(pt); i++ { + if pt[i] != 0 { + err = errors.New("non-zero pad byte") + return + } } - return sharedKey[:], their, size, nil + sizePad := int64(pktSize.Pad) - int64(len(pt)) + if sizePad == 0 { + return + } + + keyPad := make([]byte, chacha20poly1305.KeySize) + blake3.DeriveKey(keyPad, DeriveKeyPadCtx, sharedKey[:]) + xof := blake3.New(32, keyPad).XOF() + pt = make([]byte, len(ct)) + for sizePad > 0 { + n, err = io.ReadFull(r, ct) + if err != nil && err != io.ErrUnexpectedEOF { + return + } + _, err = io.ReadFull(xof, pt[:n]) + if err != nil { + panic(err) + } + if bytes.Compare(ct[:n], pt[:n]) != 0 { + err = errors.New("wrong pad value") + return + } + sizePad -= int64(n) + } + if sizePad < 0 { + err = errors.New("excess pad") + } + return } diff --git a/src/pkt_test.go b/src/pkt_test.go index 62efa71..4f59190 100644 --- a/src/pkt_test.go +++ b/src/pkt_test.go @@ -19,6 +19,8 @@ package nncp import ( "bytes" + "crypto/rand" + "io" "testing" "testing/quick" @@ -34,24 +36,37 @@ func TestPktEncWrite(t *testing.T) { if err != nil { panic(err) } - f := func(path string, pathSize uint8, data [1 << 16]byte, size, padSize uint16) bool { - dataR := bytes.NewReader(data[:]) + f := func( + path string, + pathSize uint8, + dataSize uint32, + size, minSize uint16, + wrappers uint8, + ) bool { + dataSize %= 1 << 20 + data := make([]byte, dataSize) + if _, err = io.ReadFull(rand.Reader, data); err != nil { + panic(err) + } var ct bytes.Buffer if len(path) > int(pathSize) { path = path[:int(pathSize)] } - pkt, err := NewPkt(PktTypeFile, 123, []byte(path)) + nice := uint8(123) + pkt, err := NewPkt(PktTypeFile, nice, []byte(path)) if err != nil { panic(err) } - _, err = PktEncWrite( + wrappers %= 8 + _, _, err = PktEncWrite( nodeOur, nodeTheir.Their(), pkt, - 123, - int64(size), - int64(padSize), - dataR, + nice, + int64(minSize), + MaxFileSize, + int(wrappers), + bytes.NewReader(data), &ct, ) if err != nil { @@ -83,32 +98,39 @@ func TestPktEncRead(t *testing.T) { f := func( path string, pathSize uint8, - data [1 << 16]byte, - size, padSize uint16, - junk []byte) bool { - dataR := bytes.NewReader(data[:]) + dataSize uint32, + minSize uint16, + wrappers uint8, + ) bool { + dataSize %= 1 << 20 + data := make([]byte, dataSize) + if _, err = io.ReadFull(rand.Reader, data); err != nil { + panic(err) + } var ct bytes.Buffer if len(path) > int(pathSize) { path = path[:int(pathSize)] } - pkt, err := NewPkt(PktTypeFile, 123, []byte(path)) + nice := uint8(123) + pkt, err := NewPkt(PktTypeFile, nice, []byte(path)) if err != nil { panic(err) } - _, err = PktEncWrite( + wrappers %= 8 + _, _, err = PktEncWrite( node1, node2.Their(), pkt, - 123, - int64(size), - int64(padSize), - dataR, + nice, + int64(minSize), + MaxFileSize, + int(wrappers), + bytes.NewReader(data), &ct, ) if err != nil { return false } - ct.Write(junk) var pt bytes.Buffer nodes := make(map[NodeId]*Node) nodes[*node1.Id] = node1.Their() @@ -119,12 +141,12 @@ func TestPktEncRead(t *testing.T) { if *node.Id != *node1.Id { return false } - if sizeGot != sizeWithTags(PktOverhead+int64(size)) { + if sizeGot != int64(len(data)+int(PktOverhead)) { return false } var pktBuf bytes.Buffer xdr.Marshal(&pktBuf, &pkt) - return bytes.Compare(pt.Bytes(), append(pktBuf.Bytes(), data[:int(size)]...)) == 0 + return bytes.Compare(pt.Bytes(), append(pktBuf.Bytes(), data...)) == 0 } if err := quick.Check(f, nil); err != nil { t.Error(err) diff --git a/src/progress.go b/src/progress.go index 5ebc72b..b11a3ac 100644 --- a/src/progress.go +++ b/src/progress.go @@ -25,7 +25,7 @@ import ( "time" "github.com/dustin/go-humanize" - "go.cypherpunks.ru/nncp/v7/uilive" + "go.cypherpunks.ru/nncp/v8/uilive" ) func init() { @@ -114,6 +114,18 @@ func CopyProgressed( break } } + if showPrgrs { + for _, le := range les { + if le.K == "FullSize" { + if le.V.(int64) == 0 { + Progress(prgrsPrefix, append( + les, LE{"Size", written}, LE{"FullSize", written}, + )) + } + break + } + } + } return } @@ -146,7 +158,7 @@ func Progress(prefix string, les LEs) { } what = prefix + " " + what pb.Render(what, size) - if size >= fullsize { + if fullsize != 0 && size >= fullsize { pb.Kill() progressBarsLock.Lock() delete(progressBars, pkt) diff --git a/src/toss.go b/src/toss.go index 7e7ce17..0c27289 100644 --- a/src/toss.go +++ b/src/toss.go @@ -619,11 +619,11 @@ func jobProcess( if err != nil { panic(err) } - if _, err = ctx.Tx( + if _, _, err = ctx.Tx( node, pktTrns, nice, - int64(pktSize), 0, + int64(pktSize), 0, MaxFileSize, pipeR, pktName, nil, @@ -750,8 +750,14 @@ func jobProcess( } if nodeId != sender.Id && nodeId != pktEnc.Sender { ctx.LogI("rx-area-echo", lesEcho, logMsgNode) - if _, err = ctx.Tx( - node, &pkt, nice, int64(pktSize), 0, fullPipeR, pktName, nil, + if _, _, err = ctx.Tx( + node, + &pkt, + nice, + int64(pktSize), 0, MaxFileSize, + fullPipeR, + pktName, + nil, ); err != nil { ctx.LogE("rx-area", lesEcho, err, logMsgNode) return err diff --git a/src/toss_test.go b/src/toss_test.go index deb70f1..299e80f 100644 --- a/src/toss_test.go +++ b/src/toss_test.go @@ -97,8 +97,7 @@ func TestTossExec(t *testing.T) { handle, []string{"arg0", "arg1"}, strings.NewReader("BODY\n"), - 1<<15, - false, + 1<<15, MaxFileSize, false, nil, ); err != nil { @@ -165,6 +164,10 @@ func TestTossFile(t *testing.T) { } files := make(map[string][]byte) for i, fileSize := range fileSizes { + if fileSize == 0 { + // to prevent chunked send + fileSize++ + } data := make([]byte, fileSize) if _, err := io.ReadFull(rand.Reader, data); err != nil { panic(err) @@ -222,8 +225,10 @@ func TestTossFile(t *testing.T) { return false } ctx.Neigh[*nodeOur.Id].Incoming = &incomingPath - ctx.Toss(ctx.Self.Id, TRx, DefaultNiceFile, - false, false, false, false, false, false, false) + if ctx.Toss(ctx.Self.Id, TRx, DefaultNiceFile, + false, false, false, false, false, false, false) { + return false + } if len(dirFiles(rxPath)) != 0 { return false } @@ -347,6 +352,10 @@ func TestTossFreq(t *testing.T) { ctx.Neigh[*nodeOur.Id] = nodeOur.Their() files := make(map[string][]byte) for i, fileSize := range fileSizes { + if fileSize == 0 { + // to prevent chunked send + fileSize++ + } fileData := make([]byte, fileSize) if _, err := io.ReadFull(rand.Reader, fileData); err != nil { panic(err) @@ -472,13 +481,12 @@ func TestTossTrns(t *testing.T) { } copy(pktTrans.Path[:], nodeOur.Id[:]) var dst bytes.Buffer - if _, err := PktEncWrite( + if _, _, err := PktEncWrite( ctx.Self, ctx.Neigh[*nodeOur.Id], &pktTrans, 123, - int64(len(data)), - 0, + 0, MaxFileSize, 1, bytes.NewReader(data), &dst, ); err != nil { diff --git a/src/tx.go b/src/tx.go index e096ed6..0f7d2b8 100644 --- a/src/tx.go +++ b/src/tx.go @@ -21,12 +21,9 @@ import ( "archive/tar" "bufio" "bytes" - "crypto/rand" "errors" "fmt" - "hash" "io" - "io/ioutil" "os" "path/filepath" "strconv" @@ -37,7 +34,6 @@ import ( "github.com/dustin/go-humanize" "github.com/klauspost/compress/zstd" "golang.org/x/crypto/blake2b" - "golang.org/x/crypto/chacha20poly1305" ) const ( @@ -47,20 +43,26 @@ const ( TarExt = ".tar" ) +type PktEncWriteResult struct { + pktEncRaw []byte + size int64 + err error +} + func (ctx *Ctx) Tx( node *Node, pkt *Pkt, nice uint8, - size, minSize int64, + srcSize, minSize, maxSize int64, src io.Reader, pktName string, areaId *AreaId, -) (*Node, error) { +) (*Node, int64, error) { var area *Area if areaId != nil { area = ctx.AreaId2Area[*areaId] if area.Prv == nil { - return nil, errors.New("area has no encryption keys") + return nil, 0, errors.New("area has no encryption keys") } } hops := make([]*Node, 0, 1+len(node.Via)) @@ -70,85 +72,76 @@ func (ctx *Ctx) Tx( lastNode = ctx.Neigh[*node.Via[i-1]] hops = append(hops, lastNode) } - expectedSize := size wrappers := len(hops) if area != nil { wrappers++ } - for i := 0; i < wrappers; i++ { - expectedSize = PktEncOverhead + - PktSizeOverhead + - sizeWithTags(PktOverhead+expectedSize) - } - padSize := minSize - expectedSize - if padSize < 0 { - padSize = 0 - } - if !ctx.IsEnoughSpace(size + padSize) { - return nil, errors.New("is not enough space") + var expectedSize int64 + if srcSize > 0 { + expectedSize = srcSize + PktOverhead + expectedSize += sizePadCalc(expectedSize, minSize, wrappers) + expectedSize = PktEncOverhead + sizeWithTags(expectedSize) + if maxSize != 0 && expectedSize > maxSize { + return nil, 0, TooBig + } + if !ctx.IsEnoughSpace(expectedSize) { + return nil, 0, errors.New("is not enough space") + } } tmp, err := ctx.NewTmpFileWHash() if err != nil { - return nil, err + return nil, 0, err } - errs := make(chan error) - pktEncRaws := make(chan []byte) - curSize := size + results := make(chan PktEncWriteResult) pipeR, pipeW := io.Pipe() var pipeRPrev io.Reader if area == nil { - go func(size int64, src io.Reader, dst io.WriteCloser) { + go func(src io.Reader, dst io.WriteCloser) { ctx.LogD("tx", LEs{ {"Node", hops[0].Id}, {"Nice", int(nice)}, - {"Size", size}, + {"Size", expectedSize}, }, func(les LEs) string { return fmt.Sprintf( - "Tx packet to %s (%s) nice: %s", + "Tx packet to %s (source %s) nice: %s", ctx.NodeName(hops[0].Id), - humanize.IBytes(uint64(size)), + humanize.IBytes(uint64(expectedSize)), NicenessFmt(nice), ) }) - pktEncRaw, err := PktEncWrite( - ctx.Self, hops[0], pkt, nice, size, padSize, src, dst, + pktEncRaw, size, err := PktEncWrite( + ctx.Self, hops[0], pkt, nice, minSize, maxSize, wrappers, src, dst, ) - pktEncRaws <- pktEncRaw - errs <- err + results <- PktEncWriteResult{pktEncRaw, size, err} dst.Close() - }(curSize, src, pipeW) - curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) - curSize += padSize + }(src, pipeW) } else { - go func(size, padSize int64, src io.Reader, dst io.WriteCloser) { + go func(src io.Reader, dst io.WriteCloser) { ctx.LogD("tx", LEs{ {"Area", area.Id}, {"Nice", int(nice)}, - {"Size", size}, + {"Size", expectedSize}, }, func(les LEs) string { return fmt.Sprintf( - "Tx area packet to %s (%s) nice: %s", + "Tx area packet to %s (source %s) nice: %s", ctx.AreaName(areaId), - humanize.IBytes(uint64(size)), + humanize.IBytes(uint64(expectedSize)), NicenessFmt(nice), ) }) areaNode := Node{Id: new(NodeId), ExchPub: new([32]byte)} copy(areaNode.Id[:], area.Id[:]) copy(areaNode.ExchPub[:], area.Pub[:]) - pktEncRaw, err := PktEncWrite( - ctx.Self, &areaNode, pkt, nice, size, padSize, src, dst, + pktEncRaw, size, err := PktEncWrite( + ctx.Self, &areaNode, pkt, nice, 0, maxSize, 0, src, dst, ) - pktEncRaws <- pktEncRaw - errs <- err + results <- PktEncWriteResult{pktEncRaw, size, err} dst.Close() - }(curSize, padSize, src, pipeW) - curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) - curSize += padSize + }(src, pipeW) pipeRPrev = pipeR pipeR, pipeW = io.Pipe() - go func(size int64, src io.Reader, dst io.WriteCloser) { + go func(src io.Reader, dst io.WriteCloser) { pktArea, err := NewPkt(PktTypeArea, 0, area.Id[:]) if err != nil { panic(err) @@ -156,23 +149,21 @@ func (ctx *Ctx) Tx( ctx.LogD("tx", LEs{ {"Node", hops[0].Id}, {"Nice", int(nice)}, - {"Size", size}, + {"Size", expectedSize}, }, func(les LEs) string { return fmt.Sprintf( - "Tx packet to %s (%s) nice: %s", + "Tx packet to %s (source %s) nice: %s", ctx.NodeName(hops[0].Id), - humanize.IBytes(uint64(size)), + humanize.IBytes(uint64(expectedSize)), NicenessFmt(nice), ) }) - pktEncRaw, err := PktEncWrite( - ctx.Self, hops[0], pktArea, nice, size, 0, src, dst, + pktEncRaw, size, err := PktEncWrite( + ctx.Self, hops[0], pktArea, nice, minSize, maxSize, wrappers, src, dst, ) - pktEncRaws <- pktEncRaw - errs <- err + results <- PktEncWriteResult{pktEncRaw, size, err} dst.Close() - }(curSize, pipeRPrev, pipeW) - curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + }(pipeRPrev, pipeW) } for i := 1; i < len(hops); i++ { pktTrns, err := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:]) @@ -181,54 +172,54 @@ func (ctx *Ctx) Tx( } pipeRPrev = pipeR pipeR, pipeW = io.Pipe() - go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) { + go func(node *Node, pkt *Pkt, src io.Reader, dst io.WriteCloser) { ctx.LogD("tx", LEs{ {"Node", node.Id}, {"Nice", int(nice)}, - {"Size", size}, }, func(les LEs) string { return fmt.Sprintf( - "Tx trns packet to %s (%s) nice: %s", + "Tx trns packet to %s nice: %s", ctx.NodeName(node.Id), - humanize.IBytes(uint64(size)), NicenessFmt(nice), ) }) - pktEncRaw, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst) - pktEncRaws <- pktEncRaw - errs <- err + pktEncRaw, size, err := PktEncWrite( + ctx.Self, node, pkt, nice, 0, MaxFileSize, 0, src, dst, + ) + results <- PktEncWriteResult{pktEncRaw, size, err} dst.Close() - }(hops[i], pktTrns, curSize, pipeRPrev, pipeW) - curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + }(hops[i], pktTrns, pipeRPrev, pipeW) } go func() { _, err := CopyProgressed( tmp.W, pipeR, "Tx", - LEs{{"Pkt", pktName}, {"FullSize", curSize}}, + LEs{{"Pkt", pktName}, {"FullSize", expectedSize}}, ctx.ShowPrgrs, ) - errs <- err + results <- PktEncWriteResult{err: err} }() var pktEncRaw []byte var pktEncMsg []byte if area != nil { - pktEncMsg = <-pktEncRaws - } - for i := 0; i < len(hops); i++ { - pktEncRaw = <-pktEncRaws + pktEncMsg = (<-results).pktEncRaw } + var finalSize int64 for i := 0; i <= wrappers; i++ { - err = <-errs - if err != nil { + r := <-results + if r.err != nil { tmp.Fd.Close() - return nil, err + return nil, 0, err + } + if r.pktEncRaw != nil { + finalSize = r.size + pktEncRaw = r.pktEncRaw } } nodePath := filepath.Join(ctx.Spool, lastNode.Id.String()) err = tmp.Commit(filepath.Join(nodePath, string(TTx))) os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) if err != nil { - return lastNode, err + return lastNode, 0, err } if ctx.HdrUsage { ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum())) @@ -243,15 +234,15 @@ func (ctx *Ctx) Tx( les := LEs{ {"Node", node.Id}, {"Nice", int(nice)}, - {"Size", size}, + {"Size", expectedSize}, {"Area", areaId}, {"AreaMsg", msgHash}, } logMsg := func(les LEs) string { return fmt.Sprintf( - "Tx area packet to %s (%s) nice: %s, area %s: %s", + "Tx area packet to %s (source %s) nice: %s, area %s: %s", ctx.NodeName(node.Id), - humanize.IBytes(uint64(size)), + humanize.IBytes(uint64(expectedSize)), NicenessFmt(nice), area.Name, msgHash, @@ -259,84 +250,34 @@ func (ctx *Ctx) Tx( } if err = ensureDir(seenDir); err != nil { ctx.LogE("tx-mkdir", les, err, logMsg) - return lastNode, err + return lastNode, 0, err } if fd, err := os.Create(seenPath); err == nil { fd.Close() if err = DirSync(seenDir); err != nil { ctx.LogE("tx-dirsync", les, err, logMsg) - return lastNode, err + return lastNode, 0, err } } ctx.LogI("tx-area", les, logMsg) } - return lastNode, err + return lastNode, finalSize, err } type DummyCloser struct{} func (dc DummyCloser) Close() error { return nil } -func throughTmpFile(r io.Reader) ( - reader io.Reader, - closer io.Closer, - fileSize int64, - rerr error, -) { - src, err := ioutil.TempFile("", "nncp-file") - if err != nil { - rerr = err - return - } - os.Remove(src.Name()) - tmpW := bufio.NewWriter(src) - tmpKey := make([]byte, chacha20poly1305.KeySize) - if _, rerr = rand.Read(tmpKey[:]); rerr != nil { - return - } - aead, err := chacha20poly1305.New(tmpKey) - if err != nil { - rerr = err - return - } - nonce := make([]byte, aead.NonceSize()) - written, err := aeadProcess(aead, nonce, nil, true, r, tmpW) - if err != nil { - rerr = err - return - } - fileSize = int64(written) - if err = tmpW.Flush(); err != nil { - rerr = err - return - } - if _, err = src.Seek(0, io.SeekStart); err != nil { - rerr = err - return - } - r, w := io.Pipe() - go func() { - for i := 0; i < aead.NonceSize(); i++ { - nonce[i] = 0 - } - if _, err := aeadProcess(aead, nonce, nil, false, bufio.NewReader(src), w); err != nil { - w.CloseWithError(err) - } - }() - reader = r - closer = src - return -} - func prepareTxFile(srcPath string) ( reader io.Reader, closer io.Closer, - fileSize int64, + srcSize int64, archived bool, rerr error, ) { if srcPath == "-" { - reader, closer, fileSize, rerr = throughTmpFile(bufio.NewReader(os.Stdin)) + reader = os.Stdin + closer = os.Stdin return } @@ -354,9 +295,9 @@ func prepareTxFile(srcPath string) ( rerr = err return } - fileSize = srcStat.Size() - reader = bufio.NewReader(src) + reader = src closer = src + srcSize = srcStat.Size() return } @@ -386,13 +327,13 @@ func prepareTxFile(srcPath string) ( } if info.IsDir() { // directory header, PAX record header+contents - fileSize += TarBlockSize + 2*TarBlockSize + srcSize += TarBlockSize + 2*TarBlockSize dirs = append(dirs, einfo{path: path, modTime: info.ModTime()}) } else { // file header, PAX record header+contents, file content - fileSize += TarBlockSize + 2*TarBlockSize + info.Size() + srcSize += TarBlockSize + 2*TarBlockSize + info.Size() if n := info.Size() % TarBlockSize; n != 0 { - fileSize += TarBlockSize - n // padding + srcSize += TarBlockSize - n // padding } files = append(files, einfo{ path: path, @@ -409,7 +350,7 @@ func prepareTxFile(srcPath string) ( r, w := io.Pipe() reader = r closer = DummyCloser{} - fileSize += 2 * TarBlockSize // termination block + srcSize += 2 * TarBlockSize // termination block go func() error { tarWr := tar.NewWriter(w) @@ -460,8 +401,7 @@ func (ctx *Ctx) TxFile( node *Node, nice uint8, srcPath, dstPath string, - chunkSize int64, - minSize, maxSize int64, + chunkSize, minSize, maxSize int64, areaId *AreaId, ) error { dstPathSpecified := false @@ -477,39 +417,40 @@ func (ctx *Ctx) TxFile( if filepath.IsAbs(dstPath) { return errors.New("Relative destination path required") } - reader, closer, fileSize, archived, err := prepareTxFile(srcPath) + reader, closer, srcSize, archived, err := prepareTxFile(srcPath) if closer != nil { defer closer.Close() } if err != nil { return err } - if fileSize > maxSize { - return errors.New("Too big than allowed") - } if archived && !dstPathSpecified { dstPath += TarExt } - if fileSize <= chunkSize { + if chunkSize == 0 || (srcSize > 0 && srcSize <= chunkSize) { pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath)) if err != nil { return err } - _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath, areaId) + _, finalSize, err := ctx.Tx( + node, pkt, nice, + srcSize, minSize, maxSize, + bufio.NewReader(reader), dstPath, areaId, + ) les := LEs{ {"Type", "file"}, {"Node", node.Id}, {"Nice", int(nice)}, {"Src", srcPath}, {"Dst", dstPath}, - {"Size", fileSize}, + {"Size", finalSize}, } logMsg := func(les LEs) string { return fmt.Sprintf( "File %s (%s) sent to %s:%s", srcPath, - humanize.IBytes(uint64(fileSize)), + humanize.IBytes(uint64(finalSize)), ctx.NodeName(node.Id), dstPath, ) @@ -522,57 +463,38 @@ func (ctx *Ctx) TxFile( return err } - leftSize := fileSize - metaPkt := ChunkedMeta{ - Magic: MagicNNCPMv2.B, - FileSize: uint64(fileSize), - ChunkSize: uint64(chunkSize), - Checksums: make([][MTHSize]byte, 0, (fileSize/chunkSize)+1), - } - for i := int64(0); i < (fileSize/chunkSize)+1; i++ { - hsh := new([MTHSize]byte) - metaPkt.Checksums = append(metaPkt.Checksums, *hsh) - } - var sizeToSend int64 - var hsh hash.Hash - var pkt *Pkt + br := bufio.NewReader(reader) + var sizeFull int64 var chunkNum int - var path string + checksums := [][MTHSize]byte{} for { - if leftSize <= chunkSize { - sizeToSend = leftSize - } else { - sizeToSend = chunkSize - } - path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum) - pkt, err = NewPkt(PktTypeFile, nice, []byte(path)) + lr := io.LimitReader(br, chunkSize) + path := dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum) + pkt, err := NewPkt(PktTypeFile, nice, []byte(path)) if err != nil { return err } - hsh = MTHNew(0, 0) - _, err = ctx.Tx( - node, - pkt, - nice, - sizeToSend, - minSize, - io.TeeReader(reader, hsh), - path, - areaId, + hsh := MTHNew(0, 0) + _, size, err := ctx.Tx( + node, pkt, nice, + 0, minSize, maxSize, + io.TeeReader(lr, hsh), + path, areaId, ) + les := LEs{ {"Type", "file"}, {"Node", node.Id}, {"Nice", int(nice)}, {"Src", srcPath}, {"Dst", path}, - {"Size", sizeToSend}, + {"Size", size}, } logMsg := func(les LEs) string { return fmt.Sprintf( "File %s (%s) sent to %s:%s", srcPath, - humanize.IBytes(uint64(sizeToSend)), + humanize.IBytes(uint64(size)), ctx.NodeName(node.Id), path, ) @@ -583,25 +505,44 @@ func (ctx *Ctx) TxFile( ctx.LogE("tx", les, err, logMsg) return err } - hsh.Sum(metaPkt.Checksums[chunkNum][:0]) - leftSize -= sizeToSend + + sizeFull += size - PktOverhead + var checksum [MTHSize]byte + hsh.Sum(checksum[:0]) + checksums = append(checksums, checksum) chunkNum++ - if leftSize == 0 { + if size < chunkSize { + break + } + if _, err = br.Peek(1); err != nil { break } } - var metaBuf bytes.Buffer - _, err = xdr.Marshal(&metaBuf, metaPkt) + + metaPkt := ChunkedMeta{ + Magic: MagicNNCPMv2.B, + FileSize: uint64(sizeFull), + ChunkSize: uint64(chunkSize), + Checksums: checksums, + } + var buf bytes.Buffer + _, err = xdr.Marshal(&buf, metaPkt) if err != nil { return err } - path = dstPath + ChunkedSuffixMeta - pkt, err = NewPkt(PktTypeFile, nice, []byte(path)) + path := dstPath + ChunkedSuffixMeta + pkt, err := NewPkt(PktTypeFile, nice, []byte(path)) if err != nil { return err } - metaPktSize := int64(metaBuf.Len()) - _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path, areaId) + metaPktSize := int64(buf.Len()) + _, _, err = ctx.Tx( + node, + pkt, + nice, + metaPktSize, minSize, maxSize, + &buf, path, areaId, + ) les := LEs{ {"Type", "file"}, {"Node", node.Id}, @@ -631,7 +572,8 @@ func (ctx *Ctx) TxFreq( node *Node, nice, replyNice uint8, srcPath, dstPath string, - minSize int64) error { + minSize int64, +) error { dstPath = filepath.Clean(dstPath) if filepath.IsAbs(dstPath) { return errors.New("Relative destination path required") @@ -646,7 +588,7 @@ func (ctx *Ctx) TxFreq( } src := strings.NewReader(dstPath) size := int64(src.Len()) - _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath, nil) + _, _, err = ctx.Tx(node, pkt, nice, size, minSize, MaxFileSize, src, srcPath, nil) les := LEs{ {"Type", "freq"}, {"Node", node.Id}, @@ -676,8 +618,7 @@ func (ctx *Ctx) TxExec( handle string, args []string, in io.Reader, - minSize int64, - useTmp bool, + minSize int64, maxSize int64, noCompress bool, areaId *AreaId, ) error { @@ -690,82 +631,34 @@ func (ctx *Ctx) TxExec( if noCompress { pktType = PktTypeExecFat } - pkt, rerr := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0})) - if rerr != nil { - return rerr - } - var size int64 - - if !noCompress && !useTmp { - var compressed bytes.Buffer - compressor, err := zstd.NewWriter( - &compressed, - zstd.WithEncoderLevel(zstd.SpeedDefault), - ) - if err != nil { - return err - } - if _, err = io.Copy(compressor, in); err != nil { - compressor.Close() - return err - } - if err = compressor.Close(); err != nil { - return err - } - size = int64(compressed.Len()) - _, rerr = ctx.Tx(node, pkt, nice, size, minSize, &compressed, handle, areaId) - } - if noCompress && !useTmp { - var data bytes.Buffer - if _, err := io.Copy(&data, in); err != nil { - return err - } - size = int64(data.Len()) - _, rerr = ctx.Tx(node, pkt, nice, size, minSize, &data, handle, areaId) + pkt, err := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0})) + if err != nil { + return err } - if !noCompress && useTmp { - r, w := io.Pipe() - compressor, err := zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.SpeedDefault)) + compressErr := make(chan error, 1) + if !noCompress { + pr, pw := io.Pipe() + compressor, err := zstd.NewWriter(pw, zstd.WithEncoderLevel(zstd.SpeedDefault)) if err != nil { return err } - copyErr := make(chan error) - go func() { - _, err := io.Copy(compressor, in) - if err != nil { - compressor.Close() - copyErr <- err + go func(r io.Reader) { + if _, err := io.Copy(compressor, r); err != nil { + compressErr <- err + return } - err = compressor.Close() - w.Close() - copyErr <- err - }() - tmpReader, closer, fileSize, err := throughTmpFile(r) - if closer != nil { - defer closer.Close() - } - if err != nil { - return err - } - err = <-copyErr - if err != nil { - return err - } - size = fileSize - _, rerr = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle, areaId) - } - if noCompress && useTmp { - tmpReader, closer, fileSize, err := throughTmpFile(in) - if closer != nil { - defer closer.Close() - } - if err != nil { - return err + compressErr <- compressor.Close() + pw.Close() + }(in) + in = pr + } + _, size, err := ctx.Tx(node, pkt, nice, 0, minSize, maxSize, in, handle, areaId) + if !noCompress { + e := <-compressErr + if err == nil { + err = e } - size = fileSize - _, rerr = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle, areaId) } - dst := strings.Join(append([]string{handle}, args...), " ") les := LEs{ {"Type", "exec"}, @@ -781,12 +674,12 @@ func (ctx *Ctx) TxExec( ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)), ) } - if rerr == nil { + if err == nil { ctx.LogI("tx", les, logMsg) } else { - ctx.LogE("tx", les, rerr, logMsg) + ctx.LogE("tx", les, err, logMsg) } - return rerr + return err } func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error { diff --git a/src/tx_test.go b/src/tx_test.go index 85a00db..20d20ee 100644 --- a/src/tx_test.go +++ b/src/tx_test.go @@ -19,11 +19,11 @@ package nncp import ( "bytes" + "crypto/rand" "io" "io/ioutil" "os" "path" - "strings" "testing" "testing/quick" @@ -31,12 +31,23 @@ import ( ) func TestTx(t *testing.T) { - f := func(hops uint8, pathSrc, data string, nice, replyNice uint8, padSize int16) bool { + f := func( + hops uint8, + pathSrc string, + dataSize uint32, + nice, replyNice uint8, + minSize uint32, + ) bool { + dataSize %= 1 << 20 + data := make([]byte, dataSize) + if _, err := io.ReadFull(rand.Reader, data); err != nil { + panic(err) + } + minSize %= 1 << 20 if len(pathSrc) > int(MaxPathSize) { pathSrc = pathSrc[:MaxPathSize] } hops = hops % 4 - hops = 1 spool, err := ioutil.TempDir("", "testtx") if err != nil { panic(err) @@ -75,13 +86,14 @@ func TestTx(t *testing.T) { nodeTgt.Via = append(nodeTgt.Via, node.Id) } pkt, err := NewPkt(PktTypeExec, replyNice, []byte(pathSrc)) - src := strings.NewReader(data) - dstNode, err := ctx.Tx( + src := bytes.NewReader(data) + dstNode, _, err := ctx.Tx( nodeTgt, pkt, 123, int64(src.Len()), - int64(padSize), + int64(minSize), + MaxFileSize, src, "pktName", nil, -- 2.44.0