2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2022 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 // Create/digest stream of NNCP encrypted packets.
35 xdr "github.com/davecgh/go-xdr/xdr2"
36 "github.com/dustin/go-humanize"
37 "go.cypherpunks.ru/nncp/v8"
45 fmt.Fprintf(os.Stderr, nncp.UsageHeader())
46 fmt.Fprintf(os.Stderr, "nncp-bundle -- Create/digest stream of NNCP encrypted packets\n\n")
47 fmt.Fprintf(os.Stderr, "Usage: %s [options] -tx [-delete] NODE [NODE ...] > ...\n", os.Args[0])
48 fmt.Fprintf(os.Stderr, " %s [options] -rx -delete [-dryrun] [NODE ...] < ...\n", os.Args[0])
49 fmt.Fprintf(os.Stderr, " %s [options] -rx [-check] [-dryrun] [NODE ...] < ...\n", os.Args[0])
50 fmt.Fprintln(os.Stderr, "Options:")
56 cfgPath = flag.String("cfg", nncp.DefaultCfgPath, "Path to configuration file")
57 niceRaw = flag.String("nice", nncp.NicenessFmt(255), "Minimal required niceness")
58 doRx = flag.Bool("rx", false, "Receive packets")
59 doTx = flag.Bool("tx", false, "Transfer packets")
60 doDelete = flag.Bool("delete", false, "Delete transferred packets")
61 doCheck = flag.Bool("check", false, "Check integrity while receiving")
62 dryRun = flag.Bool("dryrun", false, "Do no writes")
63 spoolPath = flag.String("spool", "", "Override path to spool")
64 logPath = flag.String("log", "", "Override path to logfile")
65 quiet = flag.Bool("quiet", false, "Print only errors")
66 showPrgrs = flag.Bool("progress", false, "Force progress showing")
67 omitPrgrs = flag.Bool("noprogress", false, "Omit progress showing")
68 debug = flag.Bool("debug", false, "Print debug messages")
69 version = flag.Bool("version", false, "Print version information")
70 warranty = flag.Bool("warranty", false, "Print warranty information")
72 log.SetFlags(log.Lshortfile)
76 fmt.Println(nncp.Warranty)
80 fmt.Println(nncp.VersionGet())
83 nice, err := nncp.NicenessParse(*niceRaw)
88 log.Fatalln("-rx and -tx can not be set simultaneously")
91 log.Fatalln("At least one of -rx and -tx must be specified")
94 ctx, err := nncp.CtxFromCmdline(
104 log.Fatalln("Error during initialization:", err)
107 nodeIds := make(map[nncp.NodeId]struct{}, flag.NArg())
108 for i := 0; i < flag.NArg(); i++ {
109 node, err := ctx.FindNode(flag.Arg(i))
111 log.Fatalln("Invalid node specified:", err)
113 nodeIds[*node.Id] = struct{}{}
120 bufStdout := bufio.NewWriter(os.Stdout)
121 tarWr := tar.NewWriter(bufStdout)
122 for nodeId := range nodeIds {
123 for job := range ctx.Jobs(&nodeId, nncp.TTx) {
124 pktName = filepath.Base(job.Path)
126 {K: "XX", V: string(nncp.TTx)},
127 {K: "Node", V: nodeId.String()},
128 {K: "Pkt", V: pktName},
130 if job.PktEnc.Nice > nice {
131 ctx.LogD("bundle-tx-too-nice", les, func(les nncp.LEs) string {
133 "Bundle transfer %s/tx/%s: too nice %s",
134 ctx.NodeName(&nodeId),
136 nncp.NicenessFmt(job.PktEnc.Nice),
141 fd, err := os.Open(job.Path)
143 log.Fatalln("Error during opening:", err)
145 if err = tarWr.WriteHeader(&tar.Header{
146 Format: tar.FormatUSTAR,
147 Name: nncp.NNCPBundlePrefix,
149 Typeflag: tar.TypeDir,
151 log.Fatalln("Error writing tar header:", err)
153 if err = tarWr.WriteHeader(&tar.Header{
154 Format: tar.FormatPAX,
155 Name: strings.Join([]string{
156 nncp.NNCPBundlePrefix,
163 Typeflag: tar.TypeReg,
165 log.Fatalln("Error writing tar header:", err)
167 if _, err = nncp.CopyProgressed(
168 tarWr, bufio.NewReader(fd), "Tx",
169 append(les, nncp.LEs{
170 {K: "Pkt", V: nncp.Base32Codec.EncodeToString(job.HshValue[:])},
171 {K: "FullSize", V: job.Size},
175 log.Fatalln("Error during copying to tar:", err)
177 if err = fd.Close(); err != nil {
178 log.Fatalln("Error during closing:", err)
180 if err = tarWr.Flush(); err != nil {
181 log.Fatalln("Error during tar flushing:", err)
183 if err = bufStdout.Flush(); err != nil {
184 log.Fatalln("Error during stdout flushing:", err)
187 if err = os.Remove(job.Path); err != nil {
188 log.Fatalln("Error during deletion:", err)
189 } else if ctx.HdrUsage {
190 os.Remove(nncp.JobPath2Hdr(job.Path))
195 append(les, nncp.LE{K: "Size", V: job.Size}),
196 func(les nncp.LEs) string {
198 "Bundle transfer, sent to node %s %s (%s)",
199 ctx.NodeName(&nodeId),
201 humanize.IBytes(uint64(job.Size)),
207 if err = tarWr.Close(); err != nil {
208 log.Fatalln("Error during tar closing:", err)
211 bufStdin := bufio.NewReaderSize(os.Stdin, CopyBufSize*2)
212 pktEncBuf := make([]byte, nncp.PktEncOverhead)
213 var pktEnc *nncp.PktEnc
215 peeked, err := bufStdin.Peek(CopyBufSize)
216 if err != nil && err != io.EOF {
217 log.Fatalln("Error during reading:", err)
219 prefixIdx := bytes.Index(peeked, []byte(nncp.NNCPBundlePrefix))
224 bufStdin.Discard(bufStdin.Buffered() - (len(nncp.NNCPBundlePrefix) - 1))
227 if _, err = bufStdin.Discard(prefixIdx); err != nil {
230 tarR := tar.NewReader(bufStdin)
231 entry, err := tarR.Next()
235 "bundle-rx-read-tar",
236 nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Err", V: err}},
237 func(les nncp.LEs) string {
238 return "Bundle transfer rx: reading tar"
244 if entry.Typeflag != tar.TypeDir {
246 "bundle-rx-read-tar",
248 {K: "XX", V: string(nncp.TRx)},
249 {K: "Err", V: errors.New("expected NNCP/")},
251 func(les nncp.LEs) string {
252 return "Bundle transfer rx: reading tar"
257 entry, err = tarR.Next()
261 "bundle-rx-read-tar",
262 nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Err", V: err}},
263 func(les nncp.LEs) string {
264 return "Bundle transfer rx: reading tar"
270 les := nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Pkt", V: entry.Name}}
271 logMsg := func(les nncp.LEs) string {
272 return "Bundle transfer rx/" + entry.Name
274 if entry.Size < nncp.PktEncOverhead {
275 ctx.LogD("bundle-rx-too-small", les, func(les nncp.LEs) string {
276 return logMsg(les) + ": too small packet"
280 if !ctx.IsEnoughSpace(entry.Size) {
281 ctx.LogE("bundle-rx", les, errors.New("not enough spool space"), logMsg)
284 pktName := filepath.Base(entry.Name)
285 if _, err = nncp.Base32Codec.DecodeString(pktName); err != nil {
288 append(les, nncp.LE{K: "Err", V: "bad packet name"}),
293 if _, err = io.ReadFull(tarR, pktEncBuf); err != nil {
296 append(les, nncp.LE{K: "Err", V: err}),
301 if _, err = xdr.Unmarshal(bytes.NewReader(pktEncBuf), &pktEnc); err != nil {
304 append(les, nncp.LE{K: "Err", V: "Bad packet structure"}),
309 switch pktEnc.Magic {
310 case nncp.MagicNNCPEv1.B:
311 err = nncp.MagicNNCPEv1.TooOld()
312 case nncp.MagicNNCPEv2.B:
313 err = nncp.MagicNNCPEv2.TooOld()
314 case nncp.MagicNNCPEv3.B:
315 err = nncp.MagicNNCPEv3.TooOld()
316 case nncp.MagicNNCPEv4.B:
317 err = nncp.MagicNNCPEv4.TooOld()
318 case nncp.MagicNNCPEv5.B:
319 err = nncp.MagicNNCPEv5.TooOld()
320 case nncp.MagicNNCPEv6.B:
322 err = errors.New("Bad packet magic number")
327 append(les, nncp.LE{K: "Err", V: err.Error()}),
332 if pktEnc.Nice > nice {
333 ctx.LogD("bundle-rx-too-nice", les, func(les nncp.LEs) string {
334 return logMsg(les) + ": too nice"
338 if *pktEnc.Sender == *ctx.SelfId && *doDelete {
339 if len(nodeIds) > 0 {
340 if _, exists := nodeIds[*pktEnc.Recipient]; !exists {
341 ctx.LogD("bundle-tx-skip", les, func(les nncp.LEs) string {
342 return logMsg(les) + ": recipient is not requested"
347 nodeId32 := nncp.Base32Codec.EncodeToString(pktEnc.Recipient[:])
349 {K: "XX", V: string(nncp.TTx)},
350 {K: "Node", V: nodeId32},
351 {K: "Pkt", V: pktName},
353 logMsg = func(les nncp.LEs) string {
354 return fmt.Sprintf("Bundle transfer %s/tx/%s", nodeId32, pktName)
356 dstPath := filepath.Join(ctx.Spool, nodeId32, string(nncp.TTx), pktName)
357 if _, err = os.Stat(dstPath); err != nil {
358 ctx.LogD("bundle-tx-missing", les, func(les nncp.LEs) string {
359 return logMsg(les) + ": packet is already missing"
363 hsh := nncp.MTHNew(entry.Size, 0)
364 if _, err = hsh.Write(pktEncBuf); err != nil {
365 log.Fatalln("Error during writing:", err)
367 if _, err = nncp.CopyProgressed(
369 append(les, nncp.LE{K: "FullSize", V: entry.Size}),
372 log.Fatalln("Error during copying:", err)
374 if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) == pktName {
375 ctx.LogI("bundle-tx-removed", les, func(les nncp.LEs) string {
376 return logMsg(les) + ": removed"
381 os.Remove(nncp.JobPath2Hdr(dstPath))
385 ctx.LogE("bundle-tx", les, errors.New("bad checksum"), logMsg)
389 if *pktEnc.Recipient != *ctx.SelfId {
390 ctx.LogD("nncp-bundle", les, func(les nncp.LEs) string {
391 return logMsg(les) + ": unknown recipient"
395 if len(nodeIds) > 0 {
396 if _, exists := nodeIds[*pktEnc.Sender]; !exists {
397 ctx.LogD("bundle-rx-skip", les, func(les nncp.LEs) string {
398 return logMsg(les) + ": sender is not requested"
403 sender := nncp.Base32Codec.EncodeToString(pktEnc.Sender[:])
405 {K: "XX", V: string(nncp.TRx)},
406 {K: "Node", V: sender},
407 {K: "Pkt", V: pktName},
408 {K: "FullSize", V: entry.Size},
410 logMsg = func(les nncp.LEs) string {
411 return fmt.Sprintf("Bundle transfer %s/rx/%s", sender, pktName)
413 dstDirPath := filepath.Join(ctx.Spool, sender, string(nncp.TRx))
414 dstPath := filepath.Join(dstDirPath, pktName)
415 if _, err = os.Stat(dstPath); err == nil || !os.IsNotExist(err) {
416 ctx.LogD("bundle-rx-exists", les, func(les nncp.LEs) string {
417 return logMsg(les) + ": packet already exists"
421 if _, err = os.Stat(filepath.Join(
422 dstDirPath, nncp.SeenDir, pktName,
423 )); err == nil || !os.IsNotExist(err) {
424 ctx.LogD("bundle-rx-seen", les, func(les nncp.LEs) string {
425 return logMsg(les) + ": packet already seen"
431 hsh := nncp.MTHNew(entry.Size, 0)
432 if _, err = hsh.Write(pktEncBuf); err != nil {
433 log.Fatalln("Error during writing:", err)
435 if _, err = nncp.CopyProgressed(hsh, tarR, "check", les, ctx.ShowPrgrs); err != nil {
436 log.Fatalln("Error during copying:", err)
438 if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) != pktName {
439 ctx.LogE("bundle-rx", les, errors.New("bad checksum"), logMsg)
443 tmp, err := ctx.NewTmpFileWHash()
445 log.Fatalln("Error during temporary file creation:", err)
447 if _, err = tmp.W.Write(pktEncBuf); err != nil {
448 log.Fatalln("Error during writing:", err)
450 if _, err = nncp.CopyProgressed(tmp.W, tarR, "check", les, ctx.ShowPrgrs); err != nil {
451 log.Fatalln("Error during copying:", err)
453 if err = tmp.W.Flush(); err != nil {
454 log.Fatalln("Error during flusing:", err)
456 if nncp.Base32Codec.EncodeToString(tmp.Hsh.Sum(nil)) == pktName {
457 if err = tmp.Commit(dstDirPath); err != nil {
458 log.Fatalln("Error during commiting:", err)
461 ctx.LogE("bundle-rx", les, errors.New("bad checksum"), logMsg)
468 if _, err = nncp.CopyProgressed(ioutil.Discard, tarR, "Rx", les, ctx.ShowPrgrs); err != nil {
469 log.Fatalln("Error during copying:", err)
472 tmp, err := ctx.NewTmpFile()
474 log.Fatalln("Error during temporary file creation:", err)
476 bufTmp := bufio.NewWriterSize(tmp, CopyBufSize)
477 if _, err = bufTmp.Write(pktEncBuf); err != nil {
478 log.Fatalln("Error during writing:", err)
480 if _, err = nncp.CopyProgressed(bufTmp, tarR, "Rx", les, ctx.ShowPrgrs); err != nil {
481 log.Fatalln("Error during copying:", err)
483 if err = bufTmp.Flush(); err != nil {
484 log.Fatalln("Error during flushing:", err)
486 if err = tmp.Sync(); err != nil {
487 log.Fatalln("Error during syncing:", err)
489 if err = tmp.Close(); err != nil {
490 log.Fatalln("Error during closing:", err)
492 if err = os.MkdirAll(dstDirPath, os.FileMode(0777)); err != nil {
493 log.Fatalln("Error during mkdir:", err)
495 if err = os.Rename(tmp.Name(), dstPath); err != nil {
496 log.Fatalln("Error during renaming:", err)
498 if err = nncp.DirSync(dstDirPath); err != nil {
499 log.Fatalln("Error during syncing:", err)
502 ctx.HdrWrite(pktEncBuf, dstPath)
506 for _, le := range les {
507 if le.K == "FullSize" {
508 les = append(les, nncp.LE{K: "Size", V: le.V})
512 ctx.LogI("bundle-rx", les, func(les nncp.LEs) string {
514 "Bundle transfer, received from %s %s (%s)",
515 sender, pktName, humanize.IBytes(uint64(entry.Size)),