From 6d771479c630cc44a3873ded16ce24f75aadf0fe Mon Sep 17 00:00:00 2001 From: Sergey Matveev Date: Sat, 18 Nov 2017 23:06:42 +0300 Subject: [PATCH] Bundles feature --- common.mk | 4 + doc/bundles.texi | 55 +++ doc/cmds.texi | 43 +++ doc/index.texi | 2 + doc/news.ru.texi | 5 + doc/news.texi | 5 + ports/nncp/Makefile | 3 +- .../nncp/cmd/nncp-bundle/main.go | 319 ++++++++++++++++++ src/cypherpunks.ru/nncp/humanizer.go | 19 ++ src/cypherpunks.ru/nncp/tmp.go | 11 +- 10 files changed, 461 insertions(+), 5 deletions(-) create mode 100644 doc/bundles.texi create mode 100644 src/cypherpunks.ru/nncp/cmd/nncp-bundle/main.go diff --git a/common.mk b/common.mk index 327f8df..34d6404 100644 --- a/common.mk +++ b/common.mk @@ -17,6 +17,7 @@ LDFLAGS = \ -X cypherpunks.ru/nncp.DefaultLogPath=$(LOGPATH) ALL = \ + nncp-bundle \ nncp-call \ nncp-caller \ nncp-cfgenc \ @@ -37,6 +38,9 @@ ALL = \ all: $(ALL) +nncp-bundle: + GOPATH=$(GOPATH) go build -ldflags "$(LDFLAGS)" cypherpunks.ru/nncp/cmd/nncp-bundle + nncp-call: GOPATH=$(GOPATH) go build -ldflags "$(LDFLAGS)" cypherpunks.ru/nncp/cmd/nncp-call diff --git a/doc/bundles.texi b/doc/bundles.texi new file mode 100644 index 0000000..a70e023 --- /dev/null +++ b/doc/bundles.texi @@ -0,0 +1,55 @@ +@node Bundles +@unnumbered Bundles + +Usual @ref{nncp-xfer} command requires filesystem it can operate on. +That presumes random access media storage usage, like hard drives, USB +flash drives and similar. But media like CD-ROM and especially tape +drives are sequential by nature. You can prepare intermediate directory +for recording to CD-ROM disc, but that requires additional storage and +is inconvenient. Tape drive will require intermediate extract step too. + +Bundles, created with @ref{nncp-bundle} command are convenient +alternative to ordinary @command{nncp-xfer}. Bundle is just a collection +of @ref{Encrypted, encrypted packets}, stream of packets. It could be +sequentially streamed for recording and digested back. + +@itemize + +@item They do not require intermediate storage before recording on +either CD-ROM or tape drive. +@verbatim +% nncp-bundle -tx SOMENODE | cdrecord -tao - # record directly to CD +% nncp-bundle -tx SOMENODE | dd of=/dev/sa0 bs=512 # record directly to tape + +% dd if=/dev/cd0 bs=2048 | nncp-bundle -rx # read directly from CD +% dd if=/dev/sa0 bs=512 | nncp-bundle -rx # read directly from tape +@end verbatim + +@item They do not require filesystem existence to deal with, simplifying +administration when operating in heterogeneous systems with varying +filesystems. No @command{mount}/@command{umount}, @command{zpool +import}/@command{zpool export} and struggling with file permissions. +@verbatim +% nncp-bundle -tx SOMENODE | dd of=/dev/da0 bs=1M # record directly to + # hard/flash drive +% dd if=/dev/da0 bs=1M | nncp-bundle -rx # read directly from drive +@end verbatim + +@item This is the fastest way to record outbound packets for offline +transmission -- sequential write is always faster, when no +metainformation needs to be updated. + +@item This is convenient to use with write-only/append-only storages, +just sending/appending new bundles. + +@item Bundles could be repeatedly broadcasted in one-way transmission. +@ref{Sync, Sync protocol} requires interactive connection, but bundles +can contain mix of various recipients. + +@end itemize + +Technically bundle is valid POSIX.1 +@url{http://www.freebsd.org/cgi/man.cgi?query=tar&sektion=5, tar archive}, +with directory/files hierarchy identical to that is used in +@ref{nncp-xfer}. So bundle can be created by manual tar-ing of +@command{nncp-xfer} resulting directory too. diff --git a/doc/cmds.texi b/doc/cmds.texi index 38f77ee..f1a029a 100644 --- a/doc/cmds.texi +++ b/doc/cmds.texi @@ -30,6 +30,49 @@ Nearly all commands have the following common options: Print warranty information (no warranty). @end table +@node nncp-bundle +@section nncp-bundle + +@verbatim +% nncp-bundle [options] -tx [-delete] NODE [NODE ...] > ... +% nncp-bundle [options] -rx -delete [NODE ...] < ... +% nncp-bundle [options] -rx [-check] [NODE ...] < ... +@end verbatim + +With @option{-tx} option, this command creates @ref{Bundles, bundle} of +@ref{Encrypted, encrypted packets} from the spool directory and writes +it to stdout. + +With @option{-rx} option, this command takes bundle from stdin and +copies all found packets for our node to the spool directory. Pay +attention that @strong{no} integrity checking is done by default. Modern +tape drives could easily provide too much throughput your CPU won't be +able to verify on the fly. So if you won't @ref{nncp-toss, toss} +received packets at the place, it is advised to run @ref{nncp-check} +utility for packets integrity verification, or use @option{-check} +option to enable on the fly integrity check. + +You can specify multiple @option{NODE} arguments, telling for what nodes +you want to create the stream, or take it from. If no nodes are +specified for @option{-rx} mode, then all packets aimed at us will be +processed. + +When packets are sent through the stream, they are still kept in the +spool directory, because there is no assurance that they are transferred +to the media (media (CD-ROM, tape drive, raw hard drive) can end). If +you want to forcefully delete them (after they are successfully flushed +to stdout) anyway, use @option{-delete} option. + +But you can verify produced stream after, by digesting it by yourself +with @option{-rx} and @option{-delete} options -- in that mode, stream +packets integrity will be checked and they will be deleted from the +spool if everything is good. So it is advisable to recheck your streams: + +@verbatim +% nncp-bundle -tx ALICE BOB WHATEVER | cdrecord -tao - +% dd if=/dev/cd0 bs=2048 | nncp-bundle -rx -delete +@end verbatim + @node nncp-call @section nncp-call diff --git a/doc/index.texi b/doc/index.texi index 4ce8613..a01d654 100644 --- a/doc/index.texi +++ b/doc/index.texi @@ -37,6 +37,7 @@ A copy of the license is included in the section entitled "Copying conditions". * Commands:: * Niceness:: * Chunked files: Chunked. +* Bundles:: * Spool directory: Spool. * Log format: Log. * Packet format: Packet. @@ -59,6 +60,7 @@ A copy of the license is included in the section entitled "Copying conditions". @include cmds.texi @include niceness.texi @include chunked.texi +@include bundles.texi @include spool.texi @include log.texi @include pkt.texi diff --git a/doc/news.ru.texi b/doc/news.ru.texi index 6d83db8..f0a2b6f 100644 --- a/doc/news.ru.texi +++ b/doc/news.ru.texi @@ -8,6 +8,11 @@ @strong{Несовместимое} изменение формата зашифрованных пакетов. Работа со старыми версиями не поддерживается. @item +@command{nncp-bundle} команда может создавать потоки зашифрованных +пакетов или потреблять их. Это полезно когда речь идёт о stdin/stdout +методах передачи (например запись на CD-ROM без создания промежуточного +подготовленного ISO образа или работа с ленточными накопителями). +@item В команде @command{nncp-call} разрешается иметь только одного обработчика контрольной суммы в фоне. Это полезно когда тысячи маленьких входящих пакетов могут создать много горутин. diff --git a/doc/news.texi b/doc/news.texi index e03a11c..190d04a 100644 --- a/doc/news.texi +++ b/doc/news.texi @@ -10,6 +10,11 @@ See also this page @ref{Новости, on russian}. @strong{Incompatible} encrypted packet format changes. Older versions are not supported. @item +@command{nncp-bundle} command can either create stream of encrypted +packets, or digest it. It is useful when dealing with stdin/stdout based +transmission methods (like writing to CD-ROM without intermediate +prepared ISO image and working with tape drives). +@item Single background checksum verifier worker is allowed in @command{nncp-call}. This is helpful when thousands of small inbound packets could create many goroutines. diff --git a/ports/nncp/Makefile b/ports/nncp/Makefile index 22b3313..c6bf12d 100644 --- a/ports/nncp/Makefile +++ b/ports/nncp/Makefile @@ -25,7 +25,8 @@ PORTDOCS= AUTHORS NEWS NEWS.RU README README.RU THANKS INFO= nncp INSTALL_TARGET= install-strip -PLIST_FILES= bin/nncp-call \ +PLIST_FILES= bin/nncp-bundle \ + bin/nncp-call \ bin/nncp-caller \ bin/nncp-cfgenc \ bin/nncp-cfgmin \ diff --git a/src/cypherpunks.ru/nncp/cmd/nncp-bundle/main.go b/src/cypherpunks.ru/nncp/cmd/nncp-bundle/main.go new file mode 100644 index 0000000..36c9cc9 --- /dev/null +++ b/src/cypherpunks.ru/nncp/cmd/nncp-bundle/main.go @@ -0,0 +1,319 @@ +/* +NNCP -- Node to Node copy, utilities for store-and-forward data exchange +Copyright (C) 2016-2017 Sergey Matveev + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +*/ + +// Create/digest stream of NNCP encrypted packets +package main + +import ( + "archive/tar" + "bufio" + "bytes" + "flag" + "fmt" + "io" + "io/ioutil" + "log" + "os" + "path/filepath" + "strconv" + "strings" + + "cypherpunks.ru/nncp" + "github.com/davecgh/go-xdr/xdr2" + "golang.org/x/crypto/blake2b" +) + +const ( + CopyBufSize = 1 << 17 +) + +func usage() { + fmt.Fprintf(os.Stderr, nncp.UsageHeader()) + fmt.Fprintln(os.Stderr, "nncp-bundle -- Create/digest stream of NNCP encrypted packets\n") + fmt.Fprintf(os.Stderr, "Usage: %s [options] -tx [-delete] NODE [NODE ...] > ...\n", os.Args[0]) + fmt.Fprintf(os.Stderr, " %s [options] -rx -delete [NODE ...] < ...\n", os.Args[0]) + fmt.Fprintf(os.Stderr, " %s [options] -rx [-check] [NODE ...] < ...\n", os.Args[0]) + fmt.Fprintln(os.Stderr, "Options:") + flag.PrintDefaults() +} + +func main() { + var ( + cfgPath = flag.String("cfg", nncp.DefaultCfgPath, "Path to configuration file") + niceRaw = flag.Int("nice", 255, "Minimal required niceness") + doRx = flag.Bool("rx", false, "Receive packets") + doTx = flag.Bool("tx", false, "Transfer packets") + doDelete = flag.Bool("delete", false, "Delete transferred packets") + doCheck = flag.Bool("check", false, "Check integrity while receiving") + quiet = flag.Bool("quiet", false, "Print only errors") + debug = flag.Bool("debug", false, "Print debug messages") + version = flag.Bool("version", false, "Print version information") + warranty = flag.Bool("warranty", false, "Print warranty information") + ) + flag.Usage = usage + flag.Parse() + if *warranty { + fmt.Println(nncp.Warranty) + return + } + if *version { + fmt.Println(nncp.VersionGet()) + return + } + if *niceRaw < 1 || *niceRaw > 255 { + log.Fatalln("-nice must be between 1 and 255") + } + nice := uint8(*niceRaw) + if *doRx && *doTx { + log.Fatalln("-rx and -tx can not be set simultaneously") + } + if !*doRx && !*doTx { + log.Fatalln("At least one of -rx and -tx must be specified") + } + + cfgRaw, err := ioutil.ReadFile(nncp.CfgPathFromEnv(cfgPath)) + if err != nil { + log.Fatalln("Can not read config:", err) + } + ctx, err := nncp.CfgParse(cfgRaw) + if err != nil { + log.Fatalln("Can not parse config:", err) + } + ctx.Quiet = *quiet + ctx.Debug = *debug + + nodeIds := make(map[nncp.NodeId]struct{}, flag.NArg()) + for i := 0; i < flag.NArg(); i++ { + node, err := ctx.FindNode(flag.Arg(i)) + if err != nil { + log.Fatalln("Invalid specified:", err) + } + nodeIds[*node.Id] = struct{}{} + } + + sds := nncp.SDS{} + if *doTx { + sds["xx"] = string(nncp.TTx) + var pktName string + bufStdout := bufio.NewWriter(os.Stdout) + tarWr := tar.NewWriter(bufStdout) + for nodeId, _ := range nodeIds { + sds["node"] = nodeId.String() + for job := range ctx.Jobs(&nodeId, nncp.TTx) { + pktName = filepath.Base(job.Fd.Name()) + sds["pkt"] = pktName + if job.PktEnc.Nice > nice { + ctx.LogD("nncp-bundle", sds, "too nice") + job.Fd.Close() + continue + } + if err = tarWr.WriteHeader(&tar.Header{ + Name: strings.Join([]string{ + nodeId.String(), + ctx.SelfId.String(), + pktName, + }, "/"), + Mode: 0440, + Size: job.Size, + Typeflag: tar.TypeReg, + }); err != nil { + log.Fatalln("Error writing tar header:", err) + } + if _, err = io.Copy(tarWr, job.Fd); err != nil { + log.Fatalln("Error during copying to tar:", err) + } + job.Fd.Close() + if err = tarWr.Flush(); err != nil { + log.Fatalln("Error during tar flushing:", err) + } + if err = bufStdout.Flush(); err != nil { + log.Fatalln("Error during stdout flushing:", err) + } + if *doDelete { + if err = os.Remove(job.Fd.Name()); err != nil { + log.Fatalln("Error during deletion:", err) + } + } + ctx.LogI("nncp-bundle", nncp.SdsAdd(sds, nncp.SDS{ + "size": strconv.FormatInt(job.Size, 10), + }), "") + } + } + if err = tarWr.Close(); err != nil { + log.Fatalln("Error during tar closing:", err) + } + } else { + tarR := tar.NewReader(bufio.NewReaderSize(os.Stdin, CopyBufSize)) + var entry *tar.Header + var sepIndex int + var exists bool + pktEncBuf := make([]byte, nncp.PktEncOverhead) + var pktEnc *nncp.PktEnc + var pktName string + var selfPath string + var dstPath string + for { + sds["xx"] = string(nncp.TRx) + entry, err = tarR.Next() + if err != nil { + if err == io.EOF { + break + } + log.Fatalln("Error during tar reading:", err) + } + sds["pkt"] = entry.Name + if entry.Size < nncp.PktEncOverhead { + ctx.LogD("nncp-bundle", sds, "Too small packet") + continue + } + sepIndex = strings.LastIndex(entry.Name, "/") + if sepIndex == -1 { + ctx.LogD("nncp-bundle", sds, "Bad packet name") + continue + } + pktName = entry.Name[sepIndex+1:] + if _, err = nncp.FromBase32(pktName); err != nil { + ctx.LogD("nncp-bundle", sds, "Bad packet name") + continue + } + if _, err = io.ReadFull(tarR, pktEncBuf); err != nil { + ctx.LogD("nncp-bundle", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "read") + continue + } + if _, err = xdr.Unmarshal(bytes.NewBuffer(pktEncBuf), &pktEnc); err != nil { + ctx.LogD("nncp-bundle", sds, "Bad packet structure") + continue + } + if pktEnc.Magic != nncp.MagicNNCPEv2 { + ctx.LogD("nncp-bundle", sds, "Bad packet magic number") + continue + } + if pktEnc.Nice > nice { + ctx.LogD("nncp-bundle", sds, "too nice") + continue + } + if *pktEnc.Sender == *ctx.SelfId && *doDelete { + if len(nodeIds) > 0 { + if _, exists = nodeIds[*pktEnc.Recipient]; !exists { + ctx.LogD("nncp-bundle", sds, "Recipient is not requested") + continue + } + } + nodeId32 := nncp.ToBase32(pktEnc.Recipient[:]) + sds["xx"] = string(nncp.TTx) + sds["node"] = nodeId32 + sds["pkt"] = pktName + dstPath = filepath.Join( + ctx.Spool, + nodeId32, + string(nncp.TTx), + pktName, + ) + if _, err = os.Stat(dstPath); err != nil { + ctx.LogD("nncp-bundle", sds, "Packet is already missing") + continue + } + hsh, err := blake2b.New256(nil) + if err != nil { + log.Fatalln("Error during hasher creation:", err) + } + if _, err = hsh.Write(pktEncBuf); err != nil { + log.Fatalln("Error during writing:", err) + } + if _, err = io.Copy(hsh, tarR); err != nil { + log.Fatalln("Error during copying:", err) + } + if nncp.ToBase32(hsh.Sum(nil)) == pktName { + ctx.LogI("nncp-bundle", sds, "removed") + os.Remove(dstPath) + } else { + ctx.LogE("nncp-bundle", sds, "bad checksum") + } + continue + } + if *pktEnc.Recipient != *ctx.SelfId { + ctx.LogD("nncp-bundle", sds, "Unknown recipient") + continue + } + if len(nodeIds) > 0 { + if _, exists = nodeIds[*pktEnc.Sender]; !exists { + ctx.LogD("nncp-bundle", sds, "Sender is not requested") + continue + } + } + sds["node"] = nncp.ToBase32(pktEnc.Recipient[:]) + sds["pkt"] = pktName + selfPath = filepath.Join(ctx.Spool, ctx.SelfId.String(), string(nncp.TRx)) + dstPath = filepath.Join(selfPath, pktName) + if _, err = os.Stat(dstPath); err == nil || !os.IsNotExist(err) { + ctx.LogD("nncp-bundle", sds, "Packet already exists") + continue + } + if *doCheck { + tmp, err := ctx.NewTmpFileWHash() + if err != nil { + log.Fatalln("Error during temporary file creation:", err) + } + if _, err = tmp.W.Write(pktEncBuf); err != nil { + log.Fatalln("Error during writing:", err) + } + if _, err = io.Copy(tmp.W, tarR); err != nil { + log.Fatalln("Error during copying:", err) + } + if err = tmp.W.Flush(); err != nil { + log.Fatalln("Error during flusing:", err) + } + if nncp.ToBase32(tmp.Hsh.Sum(nil)) == pktName { + if err = tmp.Commit(selfPath); err != nil { + log.Fatalln("Error during commiting:", err) + } + } else { + ctx.LogE("nncp-bundle", sds, "bad checksum") + tmp.Cancel() + continue + } + } else { + tmp, err := ctx.NewTmpFile() + if err != nil { + log.Fatalln("Error during temporary file creation:", err) + } + bufTmp := bufio.NewWriterSize(tmp, CopyBufSize) + if _, err = bufTmp.Write(pktEncBuf); err != nil { + log.Fatalln("Error during writing:", err) + } + if _, err = io.Copy(bufTmp, tarR); err != nil { + log.Fatalln("Error during copying:", err) + } + if err = bufTmp.Flush(); err != nil { + log.Fatalln("Error during flushing:", err) + } + tmp.Sync() + tmp.Close() + if err = os.MkdirAll(selfPath, os.FileMode(0700)); err != nil { + log.Fatalln("Error during mkdir:", err) + } + if err = os.Rename(tmp.Name(), dstPath); err != nil { + log.Fatalln("Error during renaming:", err) + } + } + ctx.LogI("nncp-bundle", nncp.SdsAdd(sds, nncp.SDS{ + "size": strconv.FormatInt(entry.Size, 10), + }), "") + } + } +} diff --git a/src/cypherpunks.ru/nncp/humanizer.go b/src/cypherpunks.ru/nncp/humanizer.go index 7abfcab..7830a31 100644 --- a/src/cypherpunks.ru/nncp/humanizer.go +++ b/src/cypherpunks.ru/nncp/humanizer.go @@ -150,6 +150,25 @@ func (ctx *Ctx) Humanize(s string) string { if err, exists := sds["err"]; exists { msg += ": " + err } + case "nncp-bundle": + switch sds["xx"] { + case "rx": + msg = "Bundle transfer, received from" + case "tx": + msg = "Bundle transfer, sent to" + default: + return s + } + if nodeS != "" { + msg += " node " + nodeS + } + msg += " " + sds["pkt"] + if size != "" { + msg += fmt.Sprintf(" (%s)", size) + } + if err, exists := sds["err"]; exists { + msg += ": " + err + } case "call-start": msg = fmt.Sprintf("Connected to %s", nodeS) case "call-finish": diff --git a/src/cypherpunks.ru/nncp/tmp.go b/src/cypherpunks.ru/nncp/tmp.go index 2ce02cb..47a20d0 100644 --- a/src/cypherpunks.ru/nncp/tmp.go +++ b/src/cypherpunks.ru/nncp/tmp.go @@ -45,7 +45,7 @@ func (ctx *Ctx) NewTmpFile() (*os.File, error) { type TmpFileWHash struct { W *bufio.Writer Fd *os.File - hsh hash.Hash + Hsh hash.Hash ctx *Ctx } @@ -61,7 +61,7 @@ func (ctx *Ctx) NewTmpFileWHash() (*TmpFileWHash, error) { return &TmpFileWHash{ W: bufio.NewWriter(io.MultiWriter(hsh, tmp)), Fd: tmp, - hsh: hsh, + Hsh: hsh, ctx: ctx, }, nil } @@ -81,9 +81,12 @@ func (tmp *TmpFileWHash) Commit(dir string) error { tmp.Fd.Close() return err } - tmp.Fd.Sync() + if err = tmp.Fd.Sync(); err != nil { + tmp.Fd.Close() + return err + } tmp.Fd.Close() - checksum := ToBase32(tmp.hsh.Sum(nil)) + checksum := ToBase32(tmp.Hsh.Sum(nil)) tmp.ctx.LogD("tmp", SDS{"src": tmp.Fd.Name(), "dst": checksum}, "commit") return os.Rename(tmp.Fd.Name(), filepath.Join(dir, checksum)) } -- 2.44.0