]> Cypherpunks.ru repositories - nncp.git/blob - src/cmd/nncp-bundle/main.go
Use explicitly larger bufio's buffer
[nncp.git] / src / cmd / nncp-bundle / main.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2022 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 // Create/digest stream of NNCP encrypted packets.
19 package main
20
21 import (
22         "archive/tar"
23         "bufio"
24         "bytes"
25         "errors"
26         "flag"
27         "fmt"
28         "io"
29         "io/ioutil"
30         "log"
31         "os"
32         "path/filepath"
33         "strings"
34
35         xdr "github.com/davecgh/go-xdr/xdr2"
36         "github.com/dustin/go-humanize"
37         "go.cypherpunks.ru/nncp/v8"
38 )
39
40 func usage() {
41         fmt.Fprintf(os.Stderr, nncp.UsageHeader())
42         fmt.Fprintf(os.Stderr, "nncp-bundle -- Create/digest stream of NNCP encrypted packets\n\n")
43         fmt.Fprintf(os.Stderr, "Usage: %s [options] -tx [-delete] NODE [NODE ...] > ...\n", os.Args[0])
44         fmt.Fprintf(os.Stderr, "       %s [options] -rx -delete [-dryrun] [NODE ...] < ...\n", os.Args[0])
45         fmt.Fprintf(os.Stderr, "       %s [options] -rx [-check] [-dryrun] [NODE ...] < ...\n", os.Args[0])
46         fmt.Fprintln(os.Stderr, "Options:")
47         flag.PrintDefaults()
48 }
49
50 func main() {
51         var (
52                 cfgPath   = flag.String("cfg", nncp.DefaultCfgPath, "Path to configuration file")
53                 niceRaw   = flag.String("nice", nncp.NicenessFmt(255), "Minimal required niceness")
54                 doRx      = flag.Bool("rx", false, "Receive packets")
55                 doTx      = flag.Bool("tx", false, "Transfer packets")
56                 doDelete  = flag.Bool("delete", false, "Delete transferred packets")
57                 doCheck   = flag.Bool("check", false, "Check integrity while receiving")
58                 dryRun    = flag.Bool("dryrun", false, "Do no writes")
59                 spoolPath = flag.String("spool", "", "Override path to spool")
60                 logPath   = flag.String("log", "", "Override path to logfile")
61                 quiet     = flag.Bool("quiet", false, "Print only errors")
62                 showPrgrs = flag.Bool("progress", false, "Force progress showing")
63                 omitPrgrs = flag.Bool("noprogress", false, "Omit progress showing")
64                 debug     = flag.Bool("debug", false, "Print debug messages")
65                 version   = flag.Bool("version", false, "Print version information")
66                 warranty  = flag.Bool("warranty", false, "Print warranty information")
67         )
68         log.SetFlags(log.Lshortfile)
69         flag.Usage = usage
70         flag.Parse()
71         if *warranty {
72                 fmt.Println(nncp.Warranty)
73                 return
74         }
75         if *version {
76                 fmt.Println(nncp.VersionGet())
77                 return
78         }
79         nice, err := nncp.NicenessParse(*niceRaw)
80         if err != nil {
81                 log.Fatalln(err)
82         }
83         if *doRx && *doTx {
84                 log.Fatalln("-rx and -tx can not be set simultaneously")
85         }
86         if !*doRx && !*doTx {
87                 log.Fatalln("At least one of -rx and -tx must be specified")
88         }
89
90         ctx, err := nncp.CtxFromCmdline(
91                 *cfgPath,
92                 *spoolPath,
93                 *logPath,
94                 *quiet,
95                 *showPrgrs,
96                 *omitPrgrs,
97                 *debug,
98         )
99         if err != nil {
100                 log.Fatalln("Error during initialization:", err)
101         }
102
103         nodeIds := make(map[nncp.NodeId]struct{}, flag.NArg())
104         for i := 0; i < flag.NArg(); i++ {
105                 node, err := ctx.FindNode(flag.Arg(i))
106                 if err != nil {
107                         log.Fatalln("Invalid node specified:", err)
108                 }
109                 nodeIds[*node.Id] = struct{}{}
110         }
111
112         ctx.Umask()
113
114         if *doTx {
115                 var pktName string
116                 bufStdout := bufio.NewWriter(os.Stdout)
117                 tarWr := tar.NewWriter(bufStdout)
118                 for nodeId := range nodeIds {
119                         for job := range ctx.Jobs(&nodeId, nncp.TTx) {
120                                 pktName = filepath.Base(job.Path)
121                                 les := nncp.LEs{
122                                         {K: "XX", V: string(nncp.TTx)},
123                                         {K: "Node", V: nodeId.String()},
124                                         {K: "Pkt", V: pktName},
125                                 }
126                                 if job.PktEnc.Nice > nice {
127                                         ctx.LogD("bundle-tx-too-nice", les, func(les nncp.LEs) string {
128                                                 return fmt.Sprintf(
129                                                         "Bundle transfer %s/tx/%s: too nice %s",
130                                                         ctx.NodeName(&nodeId),
131                                                         pktName,
132                                                         nncp.NicenessFmt(job.PktEnc.Nice),
133                                                 )
134                                         })
135                                         continue
136                                 }
137                                 fd, err := os.Open(job.Path)
138                                 if err != nil {
139                                         log.Fatalln("Error during opening:", err)
140                                 }
141                                 if err = tarWr.WriteHeader(&tar.Header{
142                                         Format:   tar.FormatUSTAR,
143                                         Name:     nncp.NNCPBundlePrefix,
144                                         Mode:     0700,
145                                         Typeflag: tar.TypeDir,
146                                 }); err != nil {
147                                         log.Fatalln("Error writing tar header:", err)
148                                 }
149                                 if err = tarWr.WriteHeader(&tar.Header{
150                                         Format: tar.FormatPAX,
151                                         Name: strings.Join([]string{
152                                                 nncp.NNCPBundlePrefix,
153                                                 nodeId.String(),
154                                                 ctx.SelfId.String(),
155                                                 pktName,
156                                         }, "/"),
157                                         Mode:     0400,
158                                         Size:     job.Size,
159                                         Typeflag: tar.TypeReg,
160                                 }); err != nil {
161                                         log.Fatalln("Error writing tar header:", err)
162                                 }
163                                 if _, err = nncp.CopyProgressed(
164                                         tarWr, bufio.NewReaderSize(fd, nncp.MTHBlockSize), "Tx",
165                                         append(les, nncp.LEs{
166                                                 {K: "Pkt", V: nncp.Base32Codec.EncodeToString(job.HshValue[:])},
167                                                 {K: "FullSize", V: job.Size},
168                                         }...),
169                                         ctx.ShowPrgrs,
170                                 ); err != nil {
171                                         log.Fatalln("Error during copying to tar:", err)
172                                 }
173                                 if err = fd.Close(); err != nil {
174                                         log.Fatalln("Error during closing:", err)
175                                 }
176                                 if err = tarWr.Flush(); err != nil {
177                                         log.Fatalln("Error during tar flushing:", err)
178                                 }
179                                 if err = bufStdout.Flush(); err != nil {
180                                         log.Fatalln("Error during stdout flushing:", err)
181                                 }
182                                 if *doDelete {
183                                         if err = os.Remove(job.Path); err != nil {
184                                                 log.Fatalln("Error during deletion:", err)
185                                         } else if ctx.HdrUsage {
186                                                 os.Remove(nncp.JobPath2Hdr(job.Path))
187                                         }
188                                 }
189                                 ctx.LogI(
190                                         "bundle-tx",
191                                         append(les, nncp.LE{K: "Size", V: job.Size}),
192                                         func(les nncp.LEs) string {
193                                                 return fmt.Sprintf(
194                                                         "Bundle transfer, sent to node %s %s (%s)",
195                                                         ctx.NodeName(&nodeId),
196                                                         pktName,
197                                                         humanize.IBytes(uint64(job.Size)),
198                                                 )
199                                         },
200                                 )
201                         }
202                 }
203                 if err = tarWr.Close(); err != nil {
204                         log.Fatalln("Error during tar closing:", err)
205                 }
206         } else {
207                 bufStdin := bufio.NewReaderSize(os.Stdin, nncp.MTHBlockSize*2)
208                 pktEncBuf := make([]byte, nncp.PktEncOverhead)
209                 var pktEnc *nncp.PktEnc
210                 for {
211                         peeked, err := bufStdin.Peek(nncp.MTHBlockSize)
212                         if err != nil && err != io.EOF {
213                                 log.Fatalln("Error during reading:", err)
214                         }
215                         prefixIdx := bytes.Index(peeked, []byte(nncp.NNCPBundlePrefix))
216                         if prefixIdx == -1 {
217                                 if err == io.EOF {
218                                         break
219                                 }
220                                 bufStdin.Discard(bufStdin.Buffered() - (len(nncp.NNCPBundlePrefix) - 1))
221                                 continue
222                         }
223                         if _, err = bufStdin.Discard(prefixIdx); err != nil {
224                                 panic(err)
225                         }
226                         tarR := tar.NewReader(bufStdin)
227                         entry, err := tarR.Next()
228                         if err != nil {
229                                 if err != io.EOF {
230                                         ctx.LogD(
231                                                 "bundle-rx-read-tar",
232                                                 nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Err", V: err}},
233                                                 func(les nncp.LEs) string {
234                                                         return "Bundle transfer rx: reading tar"
235                                                 },
236                                         )
237                                 }
238                                 continue
239                         }
240                         if entry.Typeflag != tar.TypeDir {
241                                 ctx.LogD(
242                                         "bundle-rx-read-tar",
243                                         nncp.LEs{
244                                                 {K: "XX", V: string(nncp.TRx)},
245                                                 {K: "Err", V: errors.New("expected NNCP/")},
246                                         },
247                                         func(les nncp.LEs) string {
248                                                 return "Bundle transfer rx: reading tar"
249                                         },
250                                 )
251                                 continue
252                         }
253                         entry, err = tarR.Next()
254                         if err != nil {
255                                 if err != io.EOF {
256                                         ctx.LogD(
257                                                 "bundle-rx-read-tar",
258                                                 nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Err", V: err}},
259                                                 func(les nncp.LEs) string {
260                                                         return "Bundle transfer rx: reading tar"
261                                                 },
262                                         )
263                                 }
264                                 continue
265                         }
266                         les := nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Pkt", V: entry.Name}}
267                         logMsg := func(les nncp.LEs) string {
268                                 return "Bundle transfer rx/" + entry.Name
269                         }
270                         if entry.Size < nncp.PktEncOverhead {
271                                 ctx.LogD("bundle-rx-too-small", les, func(les nncp.LEs) string {
272                                         return logMsg(les) + ": too small packet"
273                                 })
274                                 continue
275                         }
276                         if !ctx.IsEnoughSpace(entry.Size) {
277                                 ctx.LogE("bundle-rx", les, errors.New("not enough spool space"), logMsg)
278                                 continue
279                         }
280                         pktName := filepath.Base(entry.Name)
281                         if _, err = nncp.Base32Codec.DecodeString(pktName); err != nil {
282                                 ctx.LogD(
283                                         "bundle-rx",
284                                         append(les, nncp.LE{K: "Err", V: "bad packet name"}),
285                                         logMsg,
286                                 )
287                                 continue
288                         }
289                         if _, err = io.ReadFull(tarR, pktEncBuf); err != nil {
290                                 ctx.LogD(
291                                         "bundle-rx",
292                                         append(les, nncp.LE{K: "Err", V: err}),
293                                         logMsg,
294                                 )
295                                 continue
296                         }
297                         if _, err = xdr.Unmarshal(bytes.NewReader(pktEncBuf), &pktEnc); err != nil {
298                                 ctx.LogD(
299                                         "bundle-rx",
300                                         append(les, nncp.LE{K: "Err", V: "Bad packet structure"}),
301                                         logMsg,
302                                 )
303                                 continue
304                         }
305                         switch pktEnc.Magic {
306                         case nncp.MagicNNCPEv1.B:
307                                 err = nncp.MagicNNCPEv1.TooOld()
308                         case nncp.MagicNNCPEv2.B:
309                                 err = nncp.MagicNNCPEv2.TooOld()
310                         case nncp.MagicNNCPEv3.B:
311                                 err = nncp.MagicNNCPEv3.TooOld()
312                         case nncp.MagicNNCPEv4.B:
313                                 err = nncp.MagicNNCPEv4.TooOld()
314                         case nncp.MagicNNCPEv5.B:
315                                 err = nncp.MagicNNCPEv5.TooOld()
316                         case nncp.MagicNNCPEv6.B:
317                         default:
318                                 err = errors.New("Bad packet magic number")
319                         }
320                         if err != nil {
321                                 ctx.LogD(
322                                         "bundle-rx",
323                                         append(les, nncp.LE{K: "Err", V: err.Error()}),
324                                         logMsg,
325                                 )
326                                 continue
327                         }
328                         if pktEnc.Nice > nice {
329                                 ctx.LogD("bundle-rx-too-nice", les, func(les nncp.LEs) string {
330                                         return logMsg(les) + ": too nice"
331                                 })
332                                 continue
333                         }
334                         if *pktEnc.Sender == *ctx.SelfId && *doDelete {
335                                 if len(nodeIds) > 0 {
336                                         if _, exists := nodeIds[*pktEnc.Recipient]; !exists {
337                                                 ctx.LogD("bundle-tx-skip", les, func(les nncp.LEs) string {
338                                                         return logMsg(les) + ": recipient is not requested"
339                                                 })
340                                                 continue
341                                         }
342                                 }
343                                 nodeId32 := nncp.Base32Codec.EncodeToString(pktEnc.Recipient[:])
344                                 les := nncp.LEs{
345                                         {K: "XX", V: string(nncp.TTx)},
346                                         {K: "Node", V: nodeId32},
347                                         {K: "Pkt", V: pktName},
348                                 }
349                                 logMsg = func(les nncp.LEs) string {
350                                         return fmt.Sprintf("Bundle transfer %s/tx/%s", nodeId32, pktName)
351                                 }
352                                 dstPath := filepath.Join(ctx.Spool, nodeId32, string(nncp.TTx), pktName)
353                                 if _, err = os.Stat(dstPath); err != nil {
354                                         ctx.LogD("bundle-tx-missing", les, func(les nncp.LEs) string {
355                                                 return logMsg(les) + ": packet is already missing"
356                                         })
357                                         continue
358                                 }
359                                 hsh := nncp.MTHNew(entry.Size, 0)
360                                 if _, err = hsh.Write(pktEncBuf); err != nil {
361                                         log.Fatalln("Error during writing:", err)
362                                 }
363                                 if _, err = nncp.CopyProgressed(
364                                         hsh, tarR, "Rx",
365                                         append(les, nncp.LE{K: "FullSize", V: entry.Size}),
366                                         ctx.ShowPrgrs,
367                                 ); err != nil {
368                                         log.Fatalln("Error during copying:", err)
369                                 }
370                                 if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) == pktName {
371                                         ctx.LogI("bundle-tx-removed", les, func(les nncp.LEs) string {
372                                                 return logMsg(les) + ": removed"
373                                         })
374                                         if !*dryRun {
375                                                 os.Remove(dstPath)
376                                                 if ctx.HdrUsage {
377                                                         os.Remove(nncp.JobPath2Hdr(dstPath))
378                                                 }
379                                         }
380                                 } else {
381                                         ctx.LogE("bundle-tx", les, errors.New("bad checksum"), logMsg)
382                                 }
383                                 continue
384                         }
385                         if *pktEnc.Recipient != *ctx.SelfId {
386                                 ctx.LogD("nncp-bundle", les, func(les nncp.LEs) string {
387                                         return logMsg(les) + ": unknown recipient"
388                                 })
389                                 continue
390                         }
391                         if len(nodeIds) > 0 {
392                                 if _, exists := nodeIds[*pktEnc.Sender]; !exists {
393                                         ctx.LogD("bundle-rx-skip", les, func(les nncp.LEs) string {
394                                                 return logMsg(les) + ": sender is not requested"
395                                         })
396                                         continue
397                                 }
398                         }
399                         sender := nncp.Base32Codec.EncodeToString(pktEnc.Sender[:])
400                         les = nncp.LEs{
401                                 {K: "XX", V: string(nncp.TRx)},
402                                 {K: "Node", V: sender},
403                                 {K: "Pkt", V: pktName},
404                                 {K: "FullSize", V: entry.Size},
405                         }
406                         logMsg = func(les nncp.LEs) string {
407                                 return fmt.Sprintf("Bundle transfer %s/rx/%s", sender, pktName)
408                         }
409                         dstDirPath := filepath.Join(ctx.Spool, sender, string(nncp.TRx))
410                         dstPath := filepath.Join(dstDirPath, pktName)
411                         if _, err = os.Stat(dstPath); err == nil || !os.IsNotExist(err) {
412                                 ctx.LogD("bundle-rx-exists", les, func(les nncp.LEs) string {
413                                         return logMsg(les) + ": packet already exists"
414                                 })
415                                 continue
416                         }
417                         if _, err = os.Stat(filepath.Join(
418                                 dstDirPath, nncp.SeenDir, pktName,
419                         )); err == nil || !os.IsNotExist(err) {
420                                 ctx.LogD("bundle-rx-seen", les, func(les nncp.LEs) string {
421                                         return logMsg(les) + ": packet already seen"
422                                 })
423                                 continue
424                         }
425                         if *doCheck {
426                                 if *dryRun {
427                                         hsh := nncp.MTHNew(entry.Size, 0)
428                                         if _, err = hsh.Write(pktEncBuf); err != nil {
429                                                 log.Fatalln("Error during writing:", err)
430                                         }
431                                         if _, err = nncp.CopyProgressed(hsh, tarR, "check", les, ctx.ShowPrgrs); err != nil {
432                                                 log.Fatalln("Error during copying:", err)
433                                         }
434                                         if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) != pktName {
435                                                 ctx.LogE("bundle-rx", les, errors.New("bad checksum"), logMsg)
436                                                 continue
437                                         }
438                                 } else {
439                                         tmp, err := ctx.NewTmpFileWHash()
440                                         if err != nil {
441                                                 log.Fatalln("Error during temporary file creation:", err)
442                                         }
443                                         if _, err = tmp.W.Write(pktEncBuf); err != nil {
444                                                 log.Fatalln("Error during writing:", err)
445                                         }
446                                         if _, err = nncp.CopyProgressed(tmp.W, tarR, "check", les, ctx.ShowPrgrs); err != nil {
447                                                 log.Fatalln("Error during copying:", err)
448                                         }
449                                         if err = tmp.W.Flush(); err != nil {
450                                                 log.Fatalln("Error during flusing:", err)
451                                         }
452                                         if nncp.Base32Codec.EncodeToString(tmp.Hsh.Sum(nil)) == pktName {
453                                                 if err = tmp.Commit(dstDirPath); err != nil {
454                                                         log.Fatalln("Error during commiting:", err)
455                                                 }
456                                         } else {
457                                                 ctx.LogE("bundle-rx", les, errors.New("bad checksum"), logMsg)
458                                                 tmp.Cancel()
459                                                 continue
460                                         }
461                                 }
462                         } else {
463                                 if *dryRun {
464                                         if _, err = nncp.CopyProgressed(ioutil.Discard, tarR, "Rx", les, ctx.ShowPrgrs); err != nil {
465                                                 log.Fatalln("Error during copying:", err)
466                                         }
467                                 } else {
468                                         tmp, err := ctx.NewTmpFile()
469                                         if err != nil {
470                                                 log.Fatalln("Error during temporary file creation:", err)
471                                         }
472                                         bufTmp := bufio.NewWriterSize(tmp, nncp.MTHBlockSize)
473                                         if _, err = bufTmp.Write(pktEncBuf); err != nil {
474                                                 log.Fatalln("Error during writing:", err)
475                                         }
476                                         if _, err = nncp.CopyProgressed(bufTmp, tarR, "Rx", les, ctx.ShowPrgrs); err != nil {
477                                                 log.Fatalln("Error during copying:", err)
478                                         }
479                                         if err = bufTmp.Flush(); err != nil {
480                                                 log.Fatalln("Error during flushing:", err)
481                                         }
482                                         if !nncp.NoSync {
483                                                 if err = tmp.Sync(); err != nil {
484                                                         log.Fatalln("Error during syncing:", err)
485                                                 }
486                                         }
487                                         if err = tmp.Close(); err != nil {
488                                                 log.Fatalln("Error during closing:", err)
489                                         }
490                                         if err = os.MkdirAll(dstDirPath, os.FileMode(0777)); err != nil {
491                                                 log.Fatalln("Error during mkdir:", err)
492                                         }
493                                         if err = os.Rename(tmp.Name(), dstPath); err != nil {
494                                                 log.Fatalln("Error during renaming:", err)
495                                         }
496                                         if err = nncp.DirSync(dstDirPath); err != nil {
497                                                 log.Fatalln("Error during syncing:", err)
498                                         }
499                                         if ctx.HdrUsage {
500                                                 ctx.HdrWrite(pktEncBuf, dstPath)
501                                         }
502                                 }
503                         }
504                         for _, le := range les {
505                                 if le.K == "FullSize" {
506                                         les = append(les, nncp.LE{K: "Size", V: le.V})
507                                         break
508                                 }
509                         }
510                         ctx.LogI("bundle-rx", les, func(les nncp.LEs) string {
511                                 return fmt.Sprintf(
512                                         "Bundle transfer, received from %s %s (%s)",
513                                         sender, pktName, humanize.IBytes(uint64(entry.Size)),
514                                 )
515                         })
516                 }
517         }
518 }