2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2022 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 // Create/digest stream of NNCP encrypted packets.
35 xdr "github.com/davecgh/go-xdr/xdr2"
36 "github.com/dustin/go-humanize"
37 "go.cypherpunks.ru/nncp/v8"
41 fmt.Fprintf(os.Stderr, nncp.UsageHeader())
42 fmt.Fprintf(os.Stderr, "nncp-bundle -- Create/digest stream of NNCP encrypted packets\n\n")
43 fmt.Fprintf(os.Stderr, "Usage: %s [options] -tx [-delete] NODE [NODE ...] > ...\n", os.Args[0])
44 fmt.Fprintf(os.Stderr, " %s [options] -rx -delete [-dryrun] [NODE ...] < ...\n", os.Args[0])
45 fmt.Fprintf(os.Stderr, " %s [options] -rx [-check] [-dryrun] [NODE ...] < ...\n", os.Args[0])
46 fmt.Fprintln(os.Stderr, "Options:")
52 cfgPath = flag.String("cfg", nncp.DefaultCfgPath, "Path to configuration file")
53 niceRaw = flag.String("nice", nncp.NicenessFmt(255), "Minimal required niceness")
54 doRx = flag.Bool("rx", false, "Receive packets")
55 doTx = flag.Bool("tx", false, "Transfer packets")
56 doDelete = flag.Bool("delete", false, "Delete transferred packets")
57 doCheck = flag.Bool("check", false, "Check integrity while receiving")
58 dryRun = flag.Bool("dryrun", false, "Do no writes")
59 spoolPath = flag.String("spool", "", "Override path to spool")
60 logPath = flag.String("log", "", "Override path to logfile")
61 quiet = flag.Bool("quiet", false, "Print only errors")
62 showPrgrs = flag.Bool("progress", false, "Force progress showing")
63 omitPrgrs = flag.Bool("noprogress", false, "Omit progress showing")
64 debug = flag.Bool("debug", false, "Print debug messages")
65 version = flag.Bool("version", false, "Print version information")
66 warranty = flag.Bool("warranty", false, "Print warranty information")
68 log.SetFlags(log.Lshortfile)
72 fmt.Println(nncp.Warranty)
76 fmt.Println(nncp.VersionGet())
79 nice, err := nncp.NicenessParse(*niceRaw)
84 log.Fatalln("-rx and -tx can not be set simultaneously")
87 log.Fatalln("At least one of -rx and -tx must be specified")
90 ctx, err := nncp.CtxFromCmdline(
100 log.Fatalln("Error during initialization:", err)
103 nodeIds := make(map[nncp.NodeId]struct{}, flag.NArg())
104 for i := 0; i < flag.NArg(); i++ {
105 node, err := ctx.FindNode(flag.Arg(i))
107 log.Fatalln("Invalid node specified:", err)
109 nodeIds[*node.Id] = struct{}{}
116 bufStdout := bufio.NewWriter(os.Stdout)
117 tarWr := tar.NewWriter(bufStdout)
118 for nodeId := range nodeIds {
119 for job := range ctx.Jobs(&nodeId, nncp.TTx) {
120 pktName = filepath.Base(job.Path)
122 {K: "XX", V: string(nncp.TTx)},
123 {K: "Node", V: nodeId.String()},
124 {K: "Pkt", V: pktName},
126 if job.PktEnc.Nice > nice {
127 ctx.LogD("bundle-tx-too-nice", les, func(les nncp.LEs) string {
129 "Bundle transfer %s/tx/%s: too nice %s",
130 ctx.NodeName(&nodeId),
132 nncp.NicenessFmt(job.PktEnc.Nice),
137 fd, err := os.Open(job.Path)
139 log.Fatalln("Error during opening:", err)
141 if err = tarWr.WriteHeader(&tar.Header{
142 Format: tar.FormatUSTAR,
143 Name: nncp.NNCPBundlePrefix,
145 Typeflag: tar.TypeDir,
147 log.Fatalln("Error writing tar header:", err)
149 if err = tarWr.WriteHeader(&tar.Header{
150 Format: tar.FormatPAX,
151 Name: strings.Join([]string{
152 nncp.NNCPBundlePrefix,
159 Typeflag: tar.TypeReg,
161 log.Fatalln("Error writing tar header:", err)
163 if _, err = nncp.CopyProgressed(
164 tarWr, bufio.NewReaderSize(fd, nncp.MTHBlockSize), "Tx",
165 append(les, nncp.LEs{
166 {K: "Pkt", V: nncp.Base32Codec.EncodeToString(job.HshValue[:])},
167 {K: "FullSize", V: job.Size},
171 log.Fatalln("Error during copying to tar:", err)
173 if err = fd.Close(); err != nil {
174 log.Fatalln("Error during closing:", err)
176 if err = tarWr.Flush(); err != nil {
177 log.Fatalln("Error during tar flushing:", err)
179 if err = bufStdout.Flush(); err != nil {
180 log.Fatalln("Error during stdout flushing:", err)
183 if err = os.Remove(job.Path); err != nil {
184 log.Fatalln("Error during deletion:", err)
185 } else if ctx.HdrUsage {
186 os.Remove(nncp.JobPath2Hdr(job.Path))
191 append(les, nncp.LE{K: "Size", V: job.Size}),
192 func(les nncp.LEs) string {
194 "Bundle transfer, sent to node %s %s (%s)",
195 ctx.NodeName(&nodeId),
197 humanize.IBytes(uint64(job.Size)),
203 if err = tarWr.Close(); err != nil {
204 log.Fatalln("Error during tar closing:", err)
207 bufStdin := bufio.NewReaderSize(os.Stdin, nncp.MTHBlockSize*2)
208 pktEncBuf := make([]byte, nncp.PktEncOverhead)
209 var pktEnc *nncp.PktEnc
211 peeked, err := bufStdin.Peek(nncp.MTHBlockSize)
212 if err != nil && err != io.EOF {
213 log.Fatalln("Error during reading:", err)
215 prefixIdx := bytes.Index(peeked, []byte(nncp.NNCPBundlePrefix))
220 bufStdin.Discard(bufStdin.Buffered() - (len(nncp.NNCPBundlePrefix) - 1))
223 if _, err = bufStdin.Discard(prefixIdx); err != nil {
226 tarR := tar.NewReader(bufStdin)
227 entry, err := tarR.Next()
231 "bundle-rx-read-tar",
232 nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Err", V: err}},
233 func(les nncp.LEs) string {
234 return "Bundle transfer rx: reading tar"
240 if entry.Typeflag != tar.TypeDir {
242 "bundle-rx-read-tar",
244 {K: "XX", V: string(nncp.TRx)},
245 {K: "Err", V: errors.New("expected NNCP/")},
247 func(les nncp.LEs) string {
248 return "Bundle transfer rx: reading tar"
253 entry, err = tarR.Next()
257 "bundle-rx-read-tar",
258 nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Err", V: err}},
259 func(les nncp.LEs) string {
260 return "Bundle transfer rx: reading tar"
266 les := nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Pkt", V: entry.Name}}
267 logMsg := func(les nncp.LEs) string {
268 return "Bundle transfer rx/" + entry.Name
270 if entry.Size < nncp.PktEncOverhead {
271 ctx.LogD("bundle-rx-too-small", les, func(les nncp.LEs) string {
272 return logMsg(les) + ": too small packet"
276 if !ctx.IsEnoughSpace(entry.Size) {
277 ctx.LogE("bundle-rx", les, errors.New("not enough spool space"), logMsg)
280 pktName := filepath.Base(entry.Name)
281 if _, err = nncp.Base32Codec.DecodeString(pktName); err != nil {
284 append(les, nncp.LE{K: "Err", V: "bad packet name"}),
289 if _, err = io.ReadFull(tarR, pktEncBuf); err != nil {
292 append(les, nncp.LE{K: "Err", V: err}),
297 if _, err = xdr.Unmarshal(bytes.NewReader(pktEncBuf), &pktEnc); err != nil {
300 append(les, nncp.LE{K: "Err", V: "Bad packet structure"}),
305 switch pktEnc.Magic {
306 case nncp.MagicNNCPEv1.B:
307 err = nncp.MagicNNCPEv1.TooOld()
308 case nncp.MagicNNCPEv2.B:
309 err = nncp.MagicNNCPEv2.TooOld()
310 case nncp.MagicNNCPEv3.B:
311 err = nncp.MagicNNCPEv3.TooOld()
312 case nncp.MagicNNCPEv4.B:
313 err = nncp.MagicNNCPEv4.TooOld()
314 case nncp.MagicNNCPEv5.B:
315 err = nncp.MagicNNCPEv5.TooOld()
316 case nncp.MagicNNCPEv6.B:
318 err = errors.New("Bad packet magic number")
323 append(les, nncp.LE{K: "Err", V: err.Error()}),
328 if pktEnc.Nice > nice {
329 ctx.LogD("bundle-rx-too-nice", les, func(les nncp.LEs) string {
330 return logMsg(les) + ": too nice"
334 if *pktEnc.Sender == *ctx.SelfId && *doDelete {
335 if len(nodeIds) > 0 {
336 if _, exists := nodeIds[*pktEnc.Recipient]; !exists {
337 ctx.LogD("bundle-tx-skip", les, func(les nncp.LEs) string {
338 return logMsg(les) + ": recipient is not requested"
343 nodeId32 := nncp.Base32Codec.EncodeToString(pktEnc.Recipient[:])
345 {K: "XX", V: string(nncp.TTx)},
346 {K: "Node", V: nodeId32},
347 {K: "Pkt", V: pktName},
349 logMsg = func(les nncp.LEs) string {
350 return fmt.Sprintf("Bundle transfer %s/tx/%s", nodeId32, pktName)
352 dstPath := filepath.Join(ctx.Spool, nodeId32, string(nncp.TTx), pktName)
353 if _, err = os.Stat(dstPath); err != nil {
354 ctx.LogD("bundle-tx-missing", les, func(les nncp.LEs) string {
355 return logMsg(les) + ": packet is already missing"
359 hsh := nncp.MTHNew(entry.Size, 0)
360 if _, err = hsh.Write(pktEncBuf); err != nil {
361 log.Fatalln("Error during writing:", err)
363 if _, err = nncp.CopyProgressed(
365 append(les, nncp.LE{K: "FullSize", V: entry.Size}),
368 log.Fatalln("Error during copying:", err)
370 if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) == pktName {
371 ctx.LogI("bundle-tx-removed", les, func(les nncp.LEs) string {
372 return logMsg(les) + ": removed"
377 os.Remove(nncp.JobPath2Hdr(dstPath))
381 ctx.LogE("bundle-tx", les, errors.New("bad checksum"), logMsg)
385 if *pktEnc.Recipient != *ctx.SelfId {
386 ctx.LogD("nncp-bundle", les, func(les nncp.LEs) string {
387 return logMsg(les) + ": unknown recipient"
391 if len(nodeIds) > 0 {
392 if _, exists := nodeIds[*pktEnc.Sender]; !exists {
393 ctx.LogD("bundle-rx-skip", les, func(les nncp.LEs) string {
394 return logMsg(les) + ": sender is not requested"
399 sender := nncp.Base32Codec.EncodeToString(pktEnc.Sender[:])
401 {K: "XX", V: string(nncp.TRx)},
402 {K: "Node", V: sender},
403 {K: "Pkt", V: pktName},
404 {K: "FullSize", V: entry.Size},
406 logMsg = func(les nncp.LEs) string {
407 return fmt.Sprintf("Bundle transfer %s/rx/%s", sender, pktName)
409 dstDirPath := filepath.Join(ctx.Spool, sender, string(nncp.TRx))
410 dstPath := filepath.Join(dstDirPath, pktName)
411 if _, err = os.Stat(dstPath); err == nil || !os.IsNotExist(err) {
412 ctx.LogD("bundle-rx-exists", les, func(les nncp.LEs) string {
413 return logMsg(les) + ": packet already exists"
417 if _, err = os.Stat(filepath.Join(
418 dstDirPath, nncp.SeenDir, pktName,
419 )); err == nil || !os.IsNotExist(err) {
420 ctx.LogD("bundle-rx-seen", les, func(les nncp.LEs) string {
421 return logMsg(les) + ": packet already seen"
427 hsh := nncp.MTHNew(entry.Size, 0)
428 if _, err = hsh.Write(pktEncBuf); err != nil {
429 log.Fatalln("Error during writing:", err)
431 if _, err = nncp.CopyProgressed(hsh, tarR, "check", les, ctx.ShowPrgrs); err != nil {
432 log.Fatalln("Error during copying:", err)
434 if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) != pktName {
435 ctx.LogE("bundle-rx", les, errors.New("bad checksum"), logMsg)
439 tmp, err := ctx.NewTmpFileWHash()
441 log.Fatalln("Error during temporary file creation:", err)
443 if _, err = tmp.W.Write(pktEncBuf); err != nil {
444 log.Fatalln("Error during writing:", err)
446 if _, err = nncp.CopyProgressed(tmp.W, tarR, "check", les, ctx.ShowPrgrs); err != nil {
447 log.Fatalln("Error during copying:", err)
449 if err = tmp.W.Flush(); err != nil {
450 log.Fatalln("Error during flusing:", err)
452 if nncp.Base32Codec.EncodeToString(tmp.Hsh.Sum(nil)) == pktName {
453 if err = tmp.Commit(dstDirPath); err != nil {
454 log.Fatalln("Error during commiting:", err)
457 ctx.LogE("bundle-rx", les, errors.New("bad checksum"), logMsg)
464 if _, err = nncp.CopyProgressed(ioutil.Discard, tarR, "Rx", les, ctx.ShowPrgrs); err != nil {
465 log.Fatalln("Error during copying:", err)
468 tmp, err := ctx.NewTmpFile()
470 log.Fatalln("Error during temporary file creation:", err)
472 bufTmp := bufio.NewWriterSize(tmp, nncp.MTHBlockSize)
473 if _, err = bufTmp.Write(pktEncBuf); err != nil {
474 log.Fatalln("Error during writing:", err)
476 if _, err = nncp.CopyProgressed(bufTmp, tarR, "Rx", les, ctx.ShowPrgrs); err != nil {
477 log.Fatalln("Error during copying:", err)
479 if err = bufTmp.Flush(); err != nil {
480 log.Fatalln("Error during flushing:", err)
483 if err = tmp.Sync(); err != nil {
484 log.Fatalln("Error during syncing:", err)
487 if err = tmp.Close(); err != nil {
488 log.Fatalln("Error during closing:", err)
490 if err = os.MkdirAll(dstDirPath, os.FileMode(0777)); err != nil {
491 log.Fatalln("Error during mkdir:", err)
493 if err = os.Rename(tmp.Name(), dstPath); err != nil {
494 log.Fatalln("Error during renaming:", err)
496 if err = nncp.DirSync(dstDirPath); err != nil {
497 log.Fatalln("Error during syncing:", err)
500 ctx.HdrWrite(pktEncBuf, dstPath)
504 for _, le := range les {
505 if le.K == "FullSize" {
506 les = append(les, nncp.LE{K: "Size", V: le.V})
510 ctx.LogI("bundle-rx", les, func(les nncp.LEs) string {
512 "Bundle transfer, received from %s %s (%s)",
513 sender, pktName, humanize.IBytes(uint64(entry.Size)),