From 66a1f121c12a73e49848e7946ceda8b75095b32e Mon Sep 17 00:00:00 2001 From: Sergey Matveev Date: Thu, 27 Apr 2017 19:02:50 +0300 Subject: [PATCH] Initial nncp-reass utility --- VERSION | 2 +- common.mk | 4 + src/cypherpunks.ru/nncp/chunked.go | 33 ++ src/cypherpunks.ru/nncp/cmd/nncp-file/main.go | 29 +- .../nncp/cmd/nncp-reass/main.go | 311 ++++++++++++++++++ src/cypherpunks.ru/nncp/humanizer.go | 16 + src/cypherpunks.ru/nncp/tx.go | 131 ++++++++ 7 files changed, 517 insertions(+), 9 deletions(-) create mode 100644 src/cypherpunks.ru/nncp/chunked.go create mode 100644 src/cypherpunks.ru/nncp/cmd/nncp-reass/main.go diff --git a/VERSION b/VERSION index 5a2a580..eb49d7c 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.6 +0.7 diff --git a/common.mk b/common.mk index e949d12..4f12b1b 100644 --- a/common.mk +++ b/common.mk @@ -28,6 +28,7 @@ ALL = \ nncp-mincfg \ nncp-newcfg \ nncp-pkt \ + nncp-reass \ nncp-rm \ nncp-stat \ nncp-toss \ @@ -68,6 +69,9 @@ nncp-newcfg: nncp-pkt: GOPATH=$(GOPATH) go build -ldflags "$(LDFLAGS)" cypherpunks.ru/nncp/cmd/nncp-pkt +nncp-reass: + GOPATH=$(GOPATH) go build -ldflags "$(LDFLAGS)" cypherpunks.ru/nncp/cmd/nncp-reass + nncp-rm: GOPATH=$(GOPATH) go build -ldflags "$(LDFLAGS)" cypherpunks.ru/nncp/cmd/nncp-rm diff --git a/src/cypherpunks.ru/nncp/chunked.go b/src/cypherpunks.ru/nncp/chunked.go new file mode 100644 index 0000000..abcc6fa --- /dev/null +++ b/src/cypherpunks.ru/nncp/chunked.go @@ -0,0 +1,33 @@ +/* +NNCP -- Node to Node copy, utilities for store-and-forward data exchange +Copyright (C) 2016-2017 Sergey Matveev + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +*/ + +package nncp + +var ( + MagicNNCPMv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'M', 0, 0, 1} + + ChunkedSuffixMeta = ".nncp.meta" + ChunkedSuffixPart = ".nncp.part" +) + +type ChunkedMeta struct { + Magic [8]byte + FileSize uint64 + ChunkSize uint64 + Checksums [][32]byte +} diff --git a/src/cypherpunks.ru/nncp/cmd/nncp-file/main.go b/src/cypherpunks.ru/nncp/cmd/nncp-file/main.go index 4fafc48..824cdb0 100644 --- a/src/cypherpunks.ru/nncp/cmd/nncp-file/main.go +++ b/src/cypherpunks.ru/nncp/cmd/nncp-file/main.go @@ -39,13 +39,14 @@ func usage() { func main() { var ( - cfgPath = flag.String("cfg", nncp.DefaultCfgPath, "Path to configuration file") - niceRaw = flag.Int("nice", nncp.DefaultNiceFile, "Outbound packet niceness") - minSize = flag.Uint64("minsize", 0, "Minimal required resulting packet size") - quiet = flag.Bool("quiet", false, "Print only errors") - debug = flag.Bool("debug", false, "Print debug messages") - version = flag.Bool("version", false, "Print version information") - warranty = flag.Bool("warranty", false, "Print warranty information") + cfgPath = flag.String("cfg", nncp.DefaultCfgPath, "Path to configuration file") + niceRaw = flag.Int("nice", nncp.DefaultNiceFile, "Outbound packet niceness") + minSize = flag.Uint64("minsize", 0, "Minimal required resulting packet size") + chunkSize = flag.Uint64("chunk", 0, "Split file on specified size chunks, in KiB") + quiet = flag.Bool("quiet", false, "Print only errors") + debug = flag.Bool("debug", false, "Print debug messages") + version = flag.Bool("version", false, "Print version information") + warranty = flag.Bool("warranty", false, "Print warranty information") ) flag.Usage = usage flag.Parse() @@ -90,7 +91,19 @@ func main() { log.Fatalln("Invalid NODE specified:", err) } - if err = ctx.TxFile(node, nice, flag.Arg(0), splitted[1], int64(*minSize)); err != nil { + if *chunkSize == 0 { + err = ctx.TxFile(node, nice, flag.Arg(0), splitted[1], int64(*minSize)) + } else { + err = ctx.TxFileChunked( + node, + nice, + flag.Arg(0), + splitted[1], + int64(*minSize), + int64(*chunkSize) * 1024, + ) + } + if err != nil { log.Fatalln(err) } } diff --git a/src/cypherpunks.ru/nncp/cmd/nncp-reass/main.go b/src/cypherpunks.ru/nncp/cmd/nncp-reass/main.go new file mode 100644 index 0000000..1dbe56e --- /dev/null +++ b/src/cypherpunks.ru/nncp/cmd/nncp-reass/main.go @@ -0,0 +1,311 @@ +/* +NNCP -- Node to Node copy, utilities for store-and-forward data exchange +Copyright (C) 2016-2017 Sergey Matveev + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +*/ + +// Send file via NNCP +package main + +import ( + "bufio" + "bytes" + "flag" + "fmt" + "hash" + "io" + "io/ioutil" + "log" + "os" + "path/filepath" + "strconv" + "strings" + + "cypherpunks.ru/nncp" + "github.com/davecgh/go-xdr/xdr2" + "golang.org/x/crypto/blake2b" +) + +func usage() { + fmt.Fprintf(os.Stderr, nncp.UsageHeader()) + fmt.Fprintln(os.Stderr, "nncp-reass -- reassemble chunked files\n") + fmt.Fprintf(os.Stderr, "Usage: %s [options] [FILE]\nOptions:\n", os.Args[0]) + flag.PrintDefaults() + fmt.Fprint(os.Stderr, ` +Neither FILE, nor -node nor -all can be set simultaneously, +but at least one of them must be specified. +`) +} + +func process(ctx *nncp.Ctx, path string, keep, dryRun bool) bool { + fd, err := os.Open(path) + defer fd.Close() + if err != nil { + log.Fatalln("Can not open file:", err) + } + var metaPkt nncp.ChunkedMeta + if _, err = xdr.Unmarshal(fd, &metaPkt); err != nil { + ctx.LogE("nncp-reass", nncp.SDS{"path": path, "err": err}, "bad meta file") + return false + } + fd.Close() + if metaPkt.Magic != nncp.MagicNNCPMv1 { + ctx.LogE("nncp-reass", nncp.SDS{"path": path, "err": nncp.BadMagic}, "") + return false + } + metaName := filepath.Base(path) + if !strings.HasSuffix(metaName, nncp.ChunkedSuffixMeta) { + ctx.LogE("nncp-reass", nncp.SDS{ + "path": path, + "err": "invalid filename suffix", + }, "") + return false + } + mainName := strings.TrimSuffix(metaName, nncp.ChunkedSuffixMeta) + mainDir := filepath.Dir(path) + + chunksPaths := make([]string, 0, len(metaPkt.Checksums)) + for i := 0; i < len(metaPkt.Checksums); i++ { + chunksPaths = append( + chunksPaths, + filepath.Join(mainDir, mainName+nncp.ChunkedSuffixPart+strconv.Itoa(i)), + ) + } + + allChunksExist := true + for chunkNum, chunkPath := range chunksPaths { + fi, err := os.Stat(chunkPath) + if err != nil && os.IsNotExist(err) { + ctx.LogI("nncp-reass", nncp.SDS{ + "path": path, + "chunk": strconv.Itoa(chunkNum), + }, "missing") + allChunksExist = false + continue + } + if chunkNum+1 != len(chunksPaths) && uint64(fi.Size()) != metaPkt.ChunkSize { + ctx.LogE("nncp-reass", nncp.SDS{ + "path": path, + "chunk": strconv.Itoa(chunkNum), + }, "invalid size") + allChunksExist = false + } + } + if !allChunksExist { + return false + } + + var hsh hash.Hash + allChecksumsGood := true + for chunkNum, chunkPath := range chunksPaths { + fd, err = os.Open(chunkPath) + if err != nil { + log.Fatalln("Can not open file:", err) + } + hsh, err = blake2b.New256(nil) + if err != nil { + log.Fatalln(err) + } + if _, err = io.Copy(hsh, bufio.NewReader(fd)); err != nil { + log.Fatalln(err) + } + fd.Close() + if bytes.Compare(hsh.Sum(nil), metaPkt.Checksums[chunkNum][:]) != 0 { + ctx.LogE("nncp-reass", nncp.SDS{ + "path": path, + "chunk": strconv.Itoa(chunkNum), + "err": "checksum is bad", + }, "") + allChecksumsGood = false + } + } + if !allChecksumsGood { + return false + } + if dryRun { + ctx.LogI("nncp-reass", nncp.SDS{"path": path}, "ready") + return true + } + + tmp, err := ioutil.TempFile(mainDir, "nncp-reass") + if err != nil { + log.Fatalln(err) + } + sds := nncp.SDS{"path": path, "tmp": tmp.Name()} + ctx.LogD("nncp-reass", sds, "created") + tmpW := bufio.NewWriter(tmp) + + hasErrors := false + for chunkNum, chunkPath := range chunksPaths { + fd, err = os.Open(chunkPath) + if err != nil { + log.Fatalln("Can not open file:", err) + } + if _, err = io.Copy(tmpW, bufio.NewReader(fd)); err != nil { + log.Fatalln(err) + } + fd.Close() + if !keep { + if err = os.Remove(chunkPath); err != nil { + ctx.LogE("nncp-reass", nncp.SdsAdd(sds, nncp.SDS{ + "chunk": strconv.Itoa(chunkNum), + "err": err, + }), "") + hasErrors = true + } + } + } + tmpW.Flush() + tmp.Sync() + tmp.Close() + ctx.LogD("nncp-reass", sds, "written") + if !keep { + if err = os.Remove(path); err != nil { + ctx.LogE("nncp-reass", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "") + hasErrors = true + } + } + + dstPathOrig := filepath.Join(mainDir, mainName) + dstPath := dstPathOrig + dstPathCtr := 0 + for { + if _, err = os.Stat(dstPath); err != nil { + if os.IsNotExist(err) { + break + } + log.Fatalln(err) + } + dstPath = dstPathOrig + strconv.Itoa(dstPathCtr) + dstPathCtr++ + } + if err = os.Rename(tmp.Name(), dstPath); err != nil { + log.Fatalln(err) + } + ctx.LogI("nncp-reass", nncp.SDS{"path": path}, "done") + return !hasErrors +} + +func findMetas(ctx *nncp.Ctx, dirPath string) []string { + dir, err := os.Open(dirPath) + defer dir.Close() + if err != nil { + ctx.LogE("nncp-reass", nncp.SDS{"path": dirPath, "err": err}, "") + return nil + } + fis, err := dir.Readdir(0) + dir.Close() + if err != nil { + ctx.LogE("nncp-reass", nncp.SDS{"path": dirPath, "err": err}, "") + return nil + } + metaPaths := make([]string, 0) + for _, fi := range fis { + if strings.HasSuffix(fi.Name(), nncp.ChunkedSuffixMeta) { + metaPaths = append(metaPaths, filepath.Join(dirPath, fi.Name())) + } + } + return metaPaths +} + +func main() { + var ( + cfgPath = flag.String("cfg", nncp.DefaultCfgPath, "Path to configuration file") + allNodes = flag.Bool("all", false, "Process all found chunked files for all nodes") + nodeRaw = flag.String("node", "", "Process all found chunked files for that node") + keep = flag.Bool("keep", false, "Do not remove chunks while assembling") + dryRun = flag.Bool("dryrun", false, "Do not assemble whole file") + quiet = flag.Bool("quiet", false, "Print only errors") + debug = flag.Bool("debug", false, "Print debug messages") + version = flag.Bool("version", false, "Print version information") + warranty = flag.Bool("warranty", false, "Print warranty information") + ) + flag.Usage = usage + flag.Parse() + if *warranty { + fmt.Println(nncp.Warranty) + return + } + if *version { + fmt.Println(nncp.VersionGet()) + return + } + + cfgRaw, err := ioutil.ReadFile(nncp.CfgPathFromEnv(cfgPath)) + if err != nil { + log.Fatalln("Can not read config:", err) + } + ctx, err := nncp.CfgParse(cfgRaw) + if err != nil { + log.Fatalln("Can not parse config:", err) + } + ctx.Quiet = *quiet + ctx.Debug = *debug + + var nodeOnly *nncp.Node + if *nodeRaw != "" { + nodeOnly, err = ctx.FindNode(*nodeRaw) + if err != nil { + log.Fatalln("Invalid -node specified:", err) + } + } + + if !(*allNodes || nodeOnly != nil || flag.NArg() > 0) { + usage() + os.Exit(1) + } + if flag.NArg() > 0 && (*allNodes || nodeOnly != nil) { + usage() + os.Exit(1) + } + if *allNodes && nodeOnly != nil { + usage() + os.Exit(1) + } + + if flag.NArg() > 0 { + if !process(ctx, flag.Arg(0), *keep, *dryRun) { + os.Exit(1) + } + return + } + + hasErrors := false + if nodeOnly == nil { + seenMetaPaths := make(map[string]struct{}) + for _, node := range ctx.Neigh { + if node.Incoming == nil { + continue + } + for _, metaPath := range findMetas(ctx, *node.Incoming) { + if _, seen := seenMetaPaths[metaPath]; seen { + continue + } + hasErrors = hasErrors || !process(ctx, metaPath, *keep, *dryRun) + seenMetaPaths[metaPath] = struct{}{} + } + } + } else { + if nodeOnly.Incoming == nil { + log.Fatalln("Specified -node does not allow incoming") + } + for _, metaPath := range findMetas(ctx, *nodeOnly.Incoming) { + hasErrors = hasErrors || !process(ctx, metaPath, *keep, *dryRun) + } + } + if hasErrors { + os.Exit(1) + } +} diff --git a/src/cypherpunks.ru/nncp/humanizer.go b/src/cypherpunks.ru/nncp/humanizer.go index 06b33bd..546b71f 100644 --- a/src/cypherpunks.ru/nncp/humanizer.go +++ b/src/cypherpunks.ru/nncp/humanizer.go @@ -218,6 +218,22 @@ func (ctx *Ctx) Humanize(s string) string { default: return s } + case "nncp-reass": + chunkNum, exists := sds["chunk"] + if exists { + msg = fmt.Sprintf( + "Reassembling chunked file \"%s\" (chunk %s): %s", + sds["path"], + chunkNum, + rem, + ) + } else { + msg = fmt.Sprintf( + "Reassembling chunked file \"%s\": %s", + sds["path"], + rem, + ) + } default: return s } diff --git a/src/cypherpunks.ru/nncp/tx.go b/src/cypherpunks.ru/nncp/tx.go index 65ad7ec..a04f5bd 100644 --- a/src/cypherpunks.ru/nncp/tx.go +++ b/src/cypherpunks.ru/nncp/tx.go @@ -23,12 +23,14 @@ import ( "bytes" "compress/zlib" "errors" + "hash" "io" "os" "path/filepath" "strconv" "strings" + "github.com/davecgh/go-xdr/xdr2" "golang.org/x/crypto/blake2b" ) @@ -146,6 +148,135 @@ func (ctx *Ctx) TxFile(node *Node, nice uint8, srcPath, dstPath string, minSize return err } +func (ctx *Ctx) TxFileChunked(node *Node, nice uint8, srcPath, dstPath string, minSize int64, chunkSize int64) error { + if dstPath == "" { + dstPath = filepath.Base(srcPath) + } + dstPath = filepath.Clean(dstPath) + if filepath.IsAbs(dstPath) { + return errors.New("Relative destination path required") + } + src, err := os.Open(srcPath) + if err != nil { + return err + } + defer src.Close() + srcStat, err := src.Stat() + if err != nil { + return err + } + srcReader := bufio.NewReader(src) + fileSize := srcStat.Size() + leftSize := fileSize + metaPkt := ChunkedMeta{ + Magic: MagicNNCPMv1, + FileSize: uint64(fileSize), + ChunkSize: uint64(chunkSize), + Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1), + } + for i := int64(0); i < (fileSize/chunkSize)+1; i++ { + hsh := new([32]byte) + metaPkt.Checksums = append(metaPkt.Checksums, *hsh) + } + var sizeToSend int64 + var hsh hash.Hash + var pkt *Pkt + var chunkNum int + var path string + for { + if leftSize <= chunkSize { + sizeToSend = leftSize + } else { + sizeToSend = chunkSize + } + path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum) + pkt, err = NewPkt(PktTypeFile, path) + if err != nil { + return err + } + hsh, err = blake2b.New256(nil) + if err != nil { + return err + } + _, err = ctx.Tx( + node, + pkt, + nice, + sizeToSend, + minSize, + io.TeeReader(srcReader, hsh), + ) + if err == nil { + ctx.LogD("tx", SDS{ + "type": "file", + "node": node.Id, + "nice": strconv.Itoa(int(nice)), + "src": srcPath, + "dst": path, + "size": strconv.FormatInt(sizeToSend, 10), + }, "sent") + } else { + ctx.LogE("tx", SDS{ + "type": "file", + "node": node.Id, + "nice": strconv.Itoa(int(nice)), + "src": srcPath, + "dst": path, + "size": strconv.FormatInt(sizeToSend, 10), + "err": err, + }, "sent") + return err + } + hsh.Sum(metaPkt.Checksums[chunkNum][:0]) + leftSize -= sizeToSend + chunkNum++ + if leftSize == 0 { + break + } + } + var metaBuf bytes.Buffer + _, err = xdr.Marshal(&metaBuf, metaPkt) + if err != nil { + return err + } + path = dstPath + ChunkedSuffixMeta + pkt, err = NewPkt(PktTypeFile, path) + if err != nil { + return err + } + metaPktSize := int64(metaBuf.Len()) + _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf) + if err == nil { + ctx.LogD("tx", SDS{ + "type": "file", + "node": node.Id, + "nice": strconv.Itoa(int(nice)), + "src": srcPath, + "dst": path, + "size": strconv.FormatInt(metaPktSize, 10), + }, "sent") + ctx.LogI("tx", SDS{ + "type": "file", + "node": node.Id, + "nice": strconv.Itoa(int(nice)), + "src": srcPath, + "dst": dstPath, + "size": strconv.FormatInt(fileSize, 10), + }, "sent") + } else { + ctx.LogE("tx", SDS{ + "type": "file", + "node": node.Id, + "nice": strconv.Itoa(int(nice)), + "src": srcPath, + "dst": path, + "size": strconv.FormatInt(metaPktSize, 10), + "err": err, + }, "sent") + } + return err +} + func (ctx *Ctx) TxFreq(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error { dstPath = filepath.Clean(dstPath) if filepath.IsAbs(dstPath) { -- 2.44.0