2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 // Create/digest stream of NNCP encrypted packets.
35 xdr "github.com/davecgh/go-xdr/xdr2"
36 "github.com/dustin/go-humanize"
37 "go.cypherpunks.ru/nncp/v6"
38 "golang.org/x/crypto/blake2b"
46 fmt.Fprintf(os.Stderr, nncp.UsageHeader())
47 fmt.Fprintf(os.Stderr, "nncp-bundle -- Create/digest stream of NNCP encrypted packets\n\n")
48 fmt.Fprintf(os.Stderr, "Usage: %s [options] -tx [-delete] NODE [NODE ...] > ...\n", os.Args[0])
49 fmt.Fprintf(os.Stderr, " %s [options] -rx -delete [-dryrun] [NODE ...] < ...\n", os.Args[0])
50 fmt.Fprintf(os.Stderr, " %s [options] -rx [-check] [-dryrun] [NODE ...] < ...\n", os.Args[0])
51 fmt.Fprintln(os.Stderr, "Options:")
57 cfgPath = flag.String("cfg", nncp.DefaultCfgPath, "Path to configuration file")
58 niceRaw = flag.String("nice", nncp.NicenessFmt(255), "Minimal required niceness")
59 doRx = flag.Bool("rx", false, "Receive packets")
60 doTx = flag.Bool("tx", false, "Transfer packets")
61 doDelete = flag.Bool("delete", false, "Delete transferred packets")
62 doCheck = flag.Bool("check", false, "Check integrity while receiving")
63 dryRun = flag.Bool("dryrun", false, "Do no writes")
64 spoolPath = flag.String("spool", "", "Override path to spool")
65 logPath = flag.String("log", "", "Override path to logfile")
66 quiet = flag.Bool("quiet", false, "Print only errors")
67 showPrgrs = flag.Bool("progress", false, "Force progress showing")
68 omitPrgrs = flag.Bool("noprogress", false, "Omit progress showing")
69 debug = flag.Bool("debug", false, "Print debug messages")
70 version = flag.Bool("version", false, "Print version information")
71 warranty = flag.Bool("warranty", false, "Print warranty information")
73 log.SetFlags(log.Lshortfile)
77 fmt.Println(nncp.Warranty)
81 fmt.Println(nncp.VersionGet())
84 nice, err := nncp.NicenessParse(*niceRaw)
89 log.Fatalln("-rx and -tx can not be set simultaneously")
92 log.Fatalln("At least one of -rx and -tx must be specified")
95 ctx, err := nncp.CtxFromCmdline(
105 log.Fatalln("Error during initialization:", err)
108 nodeIds := make(map[nncp.NodeId]struct{}, flag.NArg())
109 for i := 0; i < flag.NArg(); i++ {
110 node, err := ctx.FindNode(flag.Arg(i))
112 log.Fatalln("Invalid specified:", err)
114 nodeIds[*node.Id] = struct{}{}
121 bufStdout := bufio.NewWriter(os.Stdout)
122 tarWr := tar.NewWriter(bufStdout)
123 for nodeId := range nodeIds {
124 for job := range ctx.Jobs(&nodeId, nncp.TTx) {
125 pktName = filepath.Base(job.Path)
127 {K: "XX", V: string(nncp.TTx)},
128 {K: "Node", V: nodeId.String()},
129 {K: "Pkt", V: pktName},
131 if job.PktEnc.Nice > nice {
132 ctx.LogD("bundle-tx-too-nice", les, func(les nncp.LEs) string {
134 "Bundle transfer %s/tx/%s: too nice %s",
135 ctx.NodeName(&nodeId),
137 nncp.NicenessFmt(job.PktEnc.Nice),
142 fd, err := os.Open(job.Path)
144 log.Fatalln("Error during opening:", err)
146 if err = tarWr.WriteHeader(&tar.Header{
147 Format: tar.FormatUSTAR,
148 Name: nncp.NNCPBundlePrefix,
150 Typeflag: tar.TypeDir,
152 log.Fatalln("Error writing tar header:", err)
154 if err = tarWr.WriteHeader(&tar.Header{
155 Format: tar.FormatPAX,
156 Name: strings.Join([]string{
157 nncp.NNCPBundlePrefix,
164 Typeflag: tar.TypeReg,
166 log.Fatalln("Error writing tar header:", err)
168 if _, err = nncp.CopyProgressed(
169 tarWr, bufio.NewReader(fd), "Tx",
170 append(les, nncp.LEs{
171 {K: "Pkt", V: nncp.Base32Codec.EncodeToString(job.HshValue[:])},
172 {K: "FullSize", V: job.Size},
176 log.Fatalln("Error during copying to tar:", err)
178 if err = fd.Close(); err != nil {
179 log.Fatalln("Error during closing:", err)
181 if err = tarWr.Flush(); err != nil {
182 log.Fatalln("Error during tar flushing:", err)
184 if err = bufStdout.Flush(); err != nil {
185 log.Fatalln("Error during stdout flushing:", err)
188 if err = os.Remove(job.Path); err != nil {
189 log.Fatalln("Error during deletion:", err)
190 } else if ctx.HdrUsage {
191 os.Remove(job.Path + nncp.HdrSuffix)
196 append(les, nncp.LE{K: "Size", V: job.Size}),
197 func(les nncp.LEs) string {
199 "Bundle transfer, sent to node %s %s (%s)",
200 ctx.NodeName(&nodeId),
202 humanize.IBytes(uint64(job.Size)),
208 if err = tarWr.Close(); err != nil {
209 log.Fatalln("Error during tar closing:", err)
212 bufStdin := bufio.NewReaderSize(os.Stdin, CopyBufSize*2)
213 pktEncBuf := make([]byte, nncp.PktEncOverhead)
214 var pktEnc *nncp.PktEnc
216 peeked, err := bufStdin.Peek(CopyBufSize)
217 if err != nil && err != io.EOF {
218 log.Fatalln("Error during reading:", err)
220 prefixIdx := bytes.Index(peeked, []byte(nncp.NNCPBundlePrefix))
225 bufStdin.Discard(bufStdin.Buffered() - (len(nncp.NNCPBundlePrefix) - 1)) // #nosec G104
228 if _, err = bufStdin.Discard(prefixIdx); err != nil {
231 tarR := tar.NewReader(bufStdin)
232 entry, err := tarR.Next()
236 "bundle-rx-read-tar",
237 nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Err", V: err}},
238 func(les nncp.LEs) string {
239 return "Bundle transfer rx: reading tar"
245 if entry.Typeflag != tar.TypeDir {
247 "bundle-rx-read-tar",
249 {K: "XX", V: string(nncp.TRx)},
250 {K: "Err", V: errors.New("expected NNCP/")},
252 func(les nncp.LEs) string {
253 return "Bundle transfer rx: reading tar"
258 entry, err = tarR.Next()
262 "bundle-rx-read-tar",
263 nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Err", V: err}},
264 func(les nncp.LEs) string {
265 return "Bundle transfer rx: reading tar"
271 les := nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Pkt", V: entry.Name}}
272 logMsg := func(les nncp.LEs) string {
273 return "Bundle transfer rx/" + entry.Name
275 if entry.Size < nncp.PktEncOverhead {
276 ctx.LogD("bundle-rx-too-small", les, func(les nncp.LEs) string {
277 return logMsg(les) + ": too small packet"
281 if !ctx.IsEnoughSpace(entry.Size) {
282 ctx.LogE("bundle-rx", les, errors.New("not enough spool space"), logMsg)
285 pktName := filepath.Base(entry.Name)
286 if _, err = nncp.Base32Codec.DecodeString(pktName); err != nil {
289 append(les, nncp.LE{K: "Err", V: "bad packet name"}),
294 if _, err = io.ReadFull(tarR, pktEncBuf); err != nil {
297 append(les, nncp.LE{K: "Err", V: err}),
302 if _, err = xdr.Unmarshal(bytes.NewReader(pktEncBuf), &pktEnc); err != nil {
305 append(les, nncp.LE{K: "Err", V: "Bad packet structure"}),
310 if pktEnc.Magic != nncp.MagicNNCPEv4 {
313 append(les, nncp.LE{K: "Err", V: "Bad packet magic number"}),
318 if pktEnc.Nice > nice {
319 ctx.LogD("bundle-rx-too-nice", les, func(les nncp.LEs) string {
320 return logMsg(les) + ": too nice"
324 if *pktEnc.Sender == *ctx.SelfId && *doDelete {
325 if len(nodeIds) > 0 {
326 if _, exists := nodeIds[*pktEnc.Recipient]; !exists {
327 ctx.LogD("bundle-tx-skip", les, func(les nncp.LEs) string {
328 return logMsg(les) + ": recipient is not requested"
333 nodeId32 := nncp.Base32Codec.EncodeToString(pktEnc.Recipient[:])
335 {K: "XX", V: string(nncp.TTx)},
336 {K: "Node", V: nodeId32},
337 {K: "Pkt", V: pktName},
339 logMsg = func(les nncp.LEs) string {
340 return fmt.Sprintf("Bundle transfer %s/tx/%s", nodeId32, pktName)
342 dstPath := filepath.Join(ctx.Spool, nodeId32, string(nncp.TTx), pktName)
343 if _, err = os.Stat(dstPath); err != nil {
344 ctx.LogD("bundle-tx-missing", les, func(les nncp.LEs) string {
345 return logMsg(les) + ": packet is already missing"
349 hsh, err := blake2b.New256(nil)
351 log.Fatalln("Error during hasher creation:", err)
353 if _, err = hsh.Write(pktEncBuf); err != nil {
354 log.Fatalln("Error during writing:", err)
356 if _, err = nncp.CopyProgressed(
358 append(les, nncp.LE{K: "FullSize", V: entry.Size}),
361 log.Fatalln("Error during copying:", err)
363 if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) == pktName {
364 ctx.LogI("bundle-tx-removed", les, func(les nncp.LEs) string {
365 return logMsg(les) + ": removed"
370 os.Remove(dstPath + nncp.HdrSuffix)
374 ctx.LogE("bundle-tx", les, errors.New("bad checksum"), logMsg)
378 if *pktEnc.Recipient != *ctx.SelfId {
379 ctx.LogD("nncp-bundle", les, func(les nncp.LEs) string {
380 return logMsg(les) + ": unknown recipient"
384 if len(nodeIds) > 0 {
385 if _, exists := nodeIds[*pktEnc.Sender]; !exists {
386 ctx.LogD("bundle-rx-skip", les, func(les nncp.LEs) string {
387 return logMsg(les) + ": sender is not requested"
392 sender := nncp.Base32Codec.EncodeToString(pktEnc.Sender[:])
394 {K: "XX", V: string(nncp.TRx)},
395 {K: "Node", V: sender},
396 {K: "Pkt", V: pktName},
397 {K: "FullSize", V: entry.Size},
399 logMsg = func(les nncp.LEs) string {
400 return fmt.Sprintf("Bundle transfer %s/rx/%s", sender, pktName)
402 dstDirPath := filepath.Join(ctx.Spool, sender, string(nncp.TRx))
403 dstPath := filepath.Join(dstDirPath, pktName)
404 if _, err = os.Stat(dstPath); err == nil || !os.IsNotExist(err) {
405 ctx.LogD("bundle-rx-exists", les, func(les nncp.LEs) string {
406 return logMsg(les) + ": packet already exists"
410 if _, err = os.Stat(dstPath + nncp.SeenSuffix); err == nil || !os.IsNotExist(err) {
411 ctx.LogD("bundle-rx-seen", les, func(les nncp.LEs) string {
412 return logMsg(les) + ": packet already seen"
418 hsh, err := blake2b.New256(nil)
420 log.Fatalln("Error during hasher creation:", err)
422 if _, err = hsh.Write(pktEncBuf); err != nil {
423 log.Fatalln("Error during writing:", err)
425 if _, err = nncp.CopyProgressed(hsh, tarR, "check", les, ctx.ShowPrgrs); err != nil {
426 log.Fatalln("Error during copying:", err)
428 if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) != pktName {
429 ctx.LogE("bundle-rx", les, errors.New("bad checksum"), logMsg)
433 tmp, err := ctx.NewTmpFileWHash()
435 log.Fatalln("Error during temporary file creation:", err)
437 if _, err = tmp.W.Write(pktEncBuf); err != nil {
438 log.Fatalln("Error during writing:", err)
440 if _, err = nncp.CopyProgressed(tmp.W, tarR, "check", les, ctx.ShowPrgrs); err != nil {
441 log.Fatalln("Error during copying:", err)
443 if err = tmp.W.Flush(); err != nil {
444 log.Fatalln("Error during flusing:", err)
446 if nncp.Base32Codec.EncodeToString(tmp.Hsh.Sum(nil)) == pktName {
447 if err = tmp.Commit(dstDirPath); err != nil {
448 log.Fatalln("Error during commiting:", err)
451 ctx.LogE("bundle-rx", les, errors.New("bad checksum"), logMsg)
458 if _, err = nncp.CopyProgressed(ioutil.Discard, tarR, "Rx", les, ctx.ShowPrgrs); err != nil {
459 log.Fatalln("Error during copying:", err)
462 tmp, err := ctx.NewTmpFile()
464 log.Fatalln("Error during temporary file creation:", err)
466 bufTmp := bufio.NewWriterSize(tmp, CopyBufSize)
467 if _, err = bufTmp.Write(pktEncBuf); err != nil {
468 log.Fatalln("Error during writing:", err)
470 if _, err = nncp.CopyProgressed(bufTmp, tarR, "Rx", les, ctx.ShowPrgrs); err != nil {
471 log.Fatalln("Error during copying:", err)
473 if err = bufTmp.Flush(); err != nil {
474 log.Fatalln("Error during flushing:", err)
476 if err = tmp.Sync(); err != nil {
477 log.Fatalln("Error during syncing:", err)
479 if err = tmp.Close(); err != nil {
480 log.Fatalln("Error during closing:", err)
482 if err = os.MkdirAll(dstDirPath, os.FileMode(0777)); err != nil {
483 log.Fatalln("Error during mkdir:", err)
485 if err = os.Rename(tmp.Name(), dstPath); err != nil {
486 log.Fatalln("Error during renaming:", err)
488 if err = nncp.DirSync(dstDirPath); err != nil {
489 log.Fatalln("Error during syncing:", err)
492 ctx.HdrWrite(pktEncBuf, dstPath)
496 for _, le := range les {
497 if le.K == "FullSize" {
498 les = append(les, nncp.LE{K: "Size", V: le.V})
502 ctx.LogI("bundle-rx", les, func(les nncp.LEs) string {
504 "Bundle transfer, received from %s %s (%s)",
505 sender, pktName, humanize.IBytes(uint64(entry.Size)),