2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
38 xdr "github.com/davecgh/go-xdr/xdr2"
39 "github.com/dustin/go-humanize"
40 "github.com/klauspost/compress/zstd"
41 "golang.org/x/crypto/blake2b"
42 "golang.org/x/crypto/poly1305"
49 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
51 "From: " + fromTo.From,
53 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
59 "Content-Type: text/plain; charset=utf-8",
60 "Content-Transfer-Encoding: base64",
62 base64.StdEncoding.EncodeToString(body),
65 return strings.NewReader(strings.Join(lines, "\n"))
68 func pktSizeWithoutEnc(pktSize int64) int64 {
69 pktSize = pktSize - PktEncOverhead - PktOverhead - PktSizeOverhead
70 pktSizeBlocks := pktSize / (EncBlkSize + poly1305.TagSize)
71 if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
72 pktSize -= poly1305.TagSize
74 pktSize -= pktSizeBlocks * poly1305.TagSize
78 var JobRepeatProcess = errors.New("needs processing repeat")
89 decompressor *zstd.Decoder,
90 dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
93 sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
95 _, err := xdr.Unmarshal(pipeR, &pkt)
97 ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
98 return fmt.Sprintf("Tossing %s/%s: unmarshal", sender.Name, pktName)
102 les = append(les, LE{"Size", int64(pktSize)})
103 ctx.LogD("rx", les, func(les LEs) string {
105 "Tossing %s/%s (%s)",
106 sender.Name, pktName,
107 humanize.IBytes(pktSize),
111 case PktTypeExec, PktTypeExecFat:
115 path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
116 handle := string(path[0])
117 args := make([]string, 0, len(path)-1)
118 for _, p := range path[1:] {
119 args = append(args, string(p))
121 argsStr := strings.Join(append([]string{handle}, args...), " ")
122 les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr})
123 cmdline, exists := sender.Exec[handle]
124 if !exists || len(cmdline) == 0 {
125 err = errors.New("No handle found")
127 "rx-no-handle", les, err,
128 func(les LEs) string {
130 "Tossing exec %s/%s (%s): %s",
131 sender.Name, pktName,
132 humanize.IBytes(pktSize), argsStr,
138 if pkt.Type == PktTypeExec {
139 if err = decompressor.Reset(pipeR); err != nil {
144 cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...)
147 "NNCP_SELF="+ctx.Self.Id.String(),
148 "NNCP_SENDER="+sender.Id.String(),
149 "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
151 if pkt.Type == PktTypeExec {
152 cmd.Stdin = decompressor
156 output, err := cmd.Output()
158 ctx.LogE("rx-hande", les, err, func(les LEs) string {
160 "Tossing exec %s/%s (%s): %s: handling",
161 sender.Name, pktName,
162 humanize.IBytes(uint64(pktSize)), argsStr,
167 if len(sendmail) > 0 && ctx.NotifyExec != nil {
168 notify, exists := ctx.NotifyExec[sender.Name+"."+handle]
170 notify, exists = ctx.NotifyExec["*."+handle]
175 append(sendmail[1:], notify.To)...,
177 cmd.Stdin = newNotification(notify, fmt.Sprintf(
178 "Exec from %s: %s", sender.Name, argsStr,
180 if err = cmd.Run(); err != nil {
181 ctx.LogE("rx-notify", les, err, func(les LEs) string {
183 "Tossing exec %s/%s (%s): %s: notifying",
184 sender.Name, pktName,
185 humanize.IBytes(pktSize), argsStr,
192 ctx.LogI("rx", les, func(les LEs) string {
194 "Got exec from %s to %s (%s)",
195 sender.Name, argsStr,
196 humanize.IBytes(pktSize),
199 if !dryRun && jobPath != "" {
201 if fd, err := os.Create(jobPath + SeenSuffix); err == nil {
202 fd.Close() // #nosec G104
205 if err = os.Remove(jobPath); err != nil {
206 ctx.LogE("rx-notify", les, err, func(les LEs) string {
208 "Tossing exec %s/%s (%s): %s: notifying",
209 sender.Name, pktName,
210 humanize.IBytes(pktSize), argsStr,
214 } else if ctx.HdrUsage {
215 os.Remove(jobPath + HdrSuffix)
223 dst := string(pkt.Path[:int(pkt.PathLen)])
224 les = append(les, LE{"Type", "file"}, LE{"Dst", dst})
225 if filepath.IsAbs(dst) {
226 err = errors.New("non-relative destination path")
228 "rx-non-rel", les, err,
229 func(les LEs) string {
231 "Tossing file %s/%s (%s): %s",
232 sender.Name, pktName,
233 humanize.IBytes(pktSize), dst,
239 incoming := sender.Incoming
241 err = errors.New("incoming is not allowed")
243 "rx-no-incoming", les, err,
244 func(les LEs) string {
246 "Tossing file %s/%s (%s): %s",
247 sender.Name, pktName,
248 humanize.IBytes(pktSize), dst,
254 dir := filepath.Join(*incoming, path.Dir(dst))
255 if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
256 ctx.LogE("rx-mkdir", les, err, func(les LEs) string {
258 "Tossing file %s/%s (%s): %s: mkdir",
259 sender.Name, pktName,
260 humanize.IBytes(pktSize), dst,
266 tmp, err := TempFile(dir, "file")
268 ctx.LogE("rx-mktemp", les, err, func(les LEs) string {
270 "Tossing file %s/%s (%s): %s: mktemp",
271 sender.Name, pktName,
272 humanize.IBytes(pktSize), dst,
277 les = append(les, LE{"Tmp", tmp.Name()})
278 ctx.LogD("rx-tmp-created", les, func(les LEs) string {
280 "Tossing file %s/%s (%s): %s: created: %s",
281 sender.Name, pktName,
282 humanize.IBytes(pktSize), dst, tmp.Name(),
285 bufW := bufio.NewWriter(tmp)
286 if _, err = CopyProgressed(
287 bufW, pipeR, "Rx file",
288 append(les, LE{"FullSize", int64(pktSize)}),
291 ctx.LogE("rx-copy", les, err, func(les LEs) string {
293 "Tossing file %s/%s (%s): %s: copying",
294 sender.Name, pktName,
295 humanize.IBytes(pktSize), dst,
300 if err = bufW.Flush(); err != nil {
301 tmp.Close() // #nosec G104
302 ctx.LogE("rx-flush", les, err, func(les LEs) string {
304 "Tossing file %s/%s (%s): %s: flushing",
305 sender.Name, pktName,
306 humanize.IBytes(pktSize), dst,
311 if err = tmp.Sync(); err != nil {
312 tmp.Close() // #nosec G104
313 ctx.LogE("rx-sync", les, err, func(les LEs) string {
315 "Tossing file %s/%s (%s): %s: syncing",
316 sender.Name, pktName,
317 humanize.IBytes(pktSize), dst,
322 if err = tmp.Close(); err != nil {
323 ctx.LogE("rx-close", les, err, func(les LEs) string {
325 "Tossing file %s/%s (%s): %s: closing",
326 sender.Name, pktName,
327 humanize.IBytes(pktSize), dst,
332 dstPathOrig := filepath.Join(*incoming, dst)
333 dstPath := dstPathOrig
336 if _, err = os.Stat(dstPath); err != nil {
337 if os.IsNotExist(err) {
340 ctx.LogE("rx-stat", les, err, func(les LEs) string {
342 "Tossing file %s/%s (%s): %s: stating: %s",
343 sender.Name, pktName,
344 humanize.IBytes(pktSize), dst, dstPath,
349 dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
352 if err = os.Rename(tmp.Name(), dstPath); err != nil {
353 ctx.LogE("rx-rename", les, err, func(les LEs) string {
355 "Tossing file %s/%s (%s): %s: renaming",
356 sender.Name, pktName,
357 humanize.IBytes(pktSize), dst,
362 if err = DirSync(*incoming); err != nil {
363 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
365 "Tossing file %s/%s (%s): %s: dirsyncing",
366 sender.Name, pktName,
367 humanize.IBytes(pktSize), dst,
372 les = les[:len(les)-1] // delete Tmp
374 ctx.LogI("rx", les, func(les LEs) string {
376 "Got file %s (%s) from %s",
377 dst, humanize.IBytes(pktSize), sender.Name,
383 if fd, err := os.Create(jobPath + SeenSuffix); err == nil {
384 fd.Close() // #nosec G104
387 if err = os.Remove(jobPath); err != nil {
388 ctx.LogE("rx-remove", les, err, func(les LEs) string {
390 "Tossing file %s/%s (%s): %s: removing",
391 sender.Name, pktName,
392 humanize.IBytes(pktSize), dst,
396 } else if ctx.HdrUsage {
397 os.Remove(jobPath + HdrSuffix)
400 if len(sendmail) > 0 && ctx.NotifyFile != nil {
403 append(sendmail[1:], ctx.NotifyFile.To)...,
405 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
406 "File from %s: %s (%s)",
407 sender.Name, dst, humanize.IBytes(pktSize),
409 if err = cmd.Run(); err != nil {
410 ctx.LogE("rx-notify", les, err, func(les LEs) string {
412 "Tossing file %s/%s (%s): %s: notifying",
413 sender.Name, pktName,
414 humanize.IBytes(pktSize), dst,
425 src := string(pkt.Path[:int(pkt.PathLen)])
426 les := append(les, LE{"Type", "freq"}, LE{"Src", src})
427 if filepath.IsAbs(src) {
428 err = errors.New("non-relative source path")
430 "rx-non-rel", les, err,
431 func(les LEs) string {
433 "Tossing freq %s/%s (%s): %s: notifying",
434 sender.Name, pktName,
435 humanize.IBytes(pktSize), src,
441 dstRaw, err := ioutil.ReadAll(pipeR)
443 ctx.LogE("rx-read", les, err, func(les LEs) string {
445 "Tossing freq %s/%s (%s): %s: reading",
446 sender.Name, pktName,
447 humanize.IBytes(pktSize), src,
452 dst := string(dstRaw)
453 les = append(les, LE{"Dst", dst})
454 freqPath := sender.FreqPath
456 err = errors.New("freqing is not allowed")
458 "rx-no-freq", les, err,
459 func(les LEs) string {
461 "Tossing freq %s/%s (%s): %s -> %s",
462 sender.Name, pktName,
463 humanize.IBytes(pktSize), src, dst,
473 filepath.Join(*freqPath, src),
481 ctx.LogE("rx-tx", les, err, func(les LEs) string {
483 "Tossing freq %s/%s (%s): %s -> %s: txing",
484 sender.Name, pktName,
485 humanize.IBytes(pktSize), src, dst,
491 ctx.LogI("rx", les, func(les LEs) string {
492 return fmt.Sprintf("Got file request %s to %s", src, sender.Name)
497 if fd, err := os.Create(jobPath + SeenSuffix); err == nil {
498 fd.Close() // #nosec G104
501 if err = os.Remove(jobPath); err != nil {
502 ctx.LogE("rx-remove", les, err, func(les LEs) string {
504 "Tossing freq %s/%s (%s): %s -> %s: removing",
505 sender.Name, pktName,
506 humanize.IBytes(pktSize), src, dst,
510 } else if ctx.HdrUsage {
511 os.Remove(jobPath + HdrSuffix)
514 if len(sendmail) > 0 && ctx.NotifyFreq != nil {
517 append(sendmail[1:], ctx.NotifyFreq.To)...,
519 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
520 "Freq from %s: %s", sender.Name, src,
522 if err = cmd.Run(); err != nil {
523 ctx.LogE("rx-notify", les, err, func(les LEs) string {
525 "Tossing freq %s/%s (%s): %s -> %s: notifying",
526 sender.Name, pktName,
527 humanize.IBytes(pktSize), src, dst,
538 dst := new([MTHSize]byte)
539 copy(dst[:], pkt.Path[:int(pkt.PathLen)])
540 nodeId := NodeId(*dst)
541 les := append(les, LE{"Type", "trns"}, LE{"Dst", nodeId})
542 logMsg := func(les LEs) string {
544 "Tossing trns %s/%s (%s): %s",
545 sender.Name, pktName,
546 humanize.IBytes(pktSize),
550 node := ctx.Neigh[nodeId]
552 err = errors.New("unknown node")
553 ctx.LogE("rx-unknown", les, err, logMsg)
556 ctx.LogD("rx-tx", les, logMsg)
558 if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil {
559 ctx.LogE("rx", les, err, func(les LEs) string {
560 return logMsg(les) + ": txing"
565 ctx.LogI("rx", les, func(les LEs) string {
567 "Got transitional packet from %s to %s (%s)",
569 ctx.NodeName(&nodeId),
570 humanize.IBytes(pktSize),
573 if !dryRun && jobPath != "" {
575 if fd, err := os.Create(jobPath + SeenSuffix); err == nil {
576 fd.Close() // #nosec G104
579 if err = os.Remove(jobPath); err != nil {
580 ctx.LogE("rx", les, err, func(les LEs) string {
582 "Tossing trns %s/%s (%s): %s: removing",
583 sender.Name, pktName,
584 humanize.IBytes(pktSize),
585 ctx.NodeName(&nodeId),
589 } else if ctx.HdrUsage {
590 os.Remove(jobPath + HdrSuffix)
598 areaId := new(AreaId)
599 copy(areaId[:], pkt.Path[:int(pkt.PathLen)])
600 les := append(les, LE{"Type", "area"}, LE{"Area", areaId})
601 logMsg := func(les LEs) string {
603 "Tossing %s/%s (%s): area %s",
604 sender.Name, pktName,
605 humanize.IBytes(pktSize),
606 ctx.AreaName(areaId),
609 area := ctx.AreaId2Area[*areaId]
611 err = errors.New("unknown area")
612 ctx.LogE("rx-area-unknown", les, err, logMsg)
615 pktEnc, pktEncRaw, err := ctx.HdrRead(pipeR)
616 fullPipeR := io.MultiReader(bytes.NewReader(pktEncRaw), pipeR)
618 ctx.LogE("rx-area-pkt-enc-read", les, err, logMsg)
621 msgHashRaw := blake2b.Sum256(pktEncRaw)
622 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
623 les = append(les, LE{"AreaMsg", msgHash})
624 ctx.LogD("rx-area", les, logMsg)
627 for _, nodeId := range area.Subs {
628 node := ctx.Neigh[*nodeId]
629 lesEcho := append(les, LE{"Echo", nodeId})
630 seenDir := filepath.Join(
631 ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
633 seenPath := filepath.Join(seenDir, msgHash+SeenSuffix)
634 logMsgNode := func(les LEs) string {
636 "%s: echoing to: %s", logMsg(les), node.Name,
639 if _, err := os.Stat(seenPath); err == nil {
640 ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
641 return logMsgNode(les) + ": already sent"
645 ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
648 for _, nodeId := range area.Subs {
649 node := ctx.Neigh[*nodeId]
650 lesEcho := append(les, LE{"Echo", nodeId})
651 seenDir := filepath.Join(
652 ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
654 seenPath := filepath.Join(seenDir, msgHash+SeenSuffix)
655 logMsgNode := func(les LEs) string {
656 return fmt.Sprintf("%s: echo to: %s", logMsg(les), node.Name)
658 if _, err := os.Stat(seenPath); err == nil {
659 ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
660 return logMsgNode(les) + ": already sent"
664 if nodeId != sender.Id {
665 ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
667 node, &pkt, nice, int64(pktSize), 0, fullPipeR, pktName, nil,
669 ctx.LogE("rx-area", lesEcho, err, logMsgNode)
673 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
674 ctx.LogE("rx-area-mkdir", lesEcho, err, logMsgNode)
677 if fd, err := os.Create(seenPath); err == nil {
679 if err = DirSync(seenDir); err != nil {
680 ctx.LogE("rx-area-dirsync", les, err, logMsgNode)
684 ctx.LogE("rx-area-touch", lesEcho, err, logMsgNode)
687 return JobRepeatProcess
691 seenDir := filepath.Join(
692 ctx.Spool, ctx.SelfId.String(), AreaDir, area.Id.String(),
694 seenPath := filepath.Join(seenDir, msgHash+SeenSuffix)
695 if _, err := os.Stat(seenPath); err == nil {
696 ctx.LogD("rx-area-seen", les, func(les LEs) string {
697 return logMsg(les) + ": already seen"
699 if !dryRun && jobPath != "" {
700 if err = os.Remove(jobPath); err != nil {
701 ctx.LogE("rx-area-remove", les, err, func(les LEs) string {
703 "Tossing area %s/%s (%s): %s: removing",
704 sender.Name, pktName,
705 humanize.IBytes(pktSize),
710 } else if ctx.HdrUsage {
711 os.Remove(jobPath + HdrSuffix)
718 ctx.LogD("rx-area-no-prv", les, func(les LEs) string {
719 return logMsg(les) + ": no private key for decoding"
722 signatureVerify := true
723 if _, senderKnown := ctx.Neigh[*pktEnc.Sender]; !senderKnown {
724 if !area.AllowUnknown {
725 err = errors.New("unknown sender")
728 append(les, LE{"Sender", pktEnc.Sender}),
730 func(les LEs) string {
731 return logMsg(les) + ": sender: " + pktEnc.Sender.String()
736 signatureVerify = false
738 areaNodeOur := NodeOur{Id: new(NodeId), ExchPrv: new([32]byte)}
739 copy(areaNodeOur.Id[:], area.Id[:])
740 copy(areaNodeOur.ExchPrv[:], area.Prv[:])
744 Incoming: area.Incoming,
747 copy(areaNode.Id[:], area.Id[:])
748 pktName := fmt.Sprintf(
750 Base32Codec.EncodeToString(areaId[:]), msgHash,
753 pipeR, pipeW := io.Pipe()
754 errs := make(chan error, 1)
763 uint64(pktSizeWithoutEnc(int64(pktSize))),
766 dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
769 _, _, _, err = PktEncRead(
778 pipeW.CloseWithError(err)
779 go func() { <-errs }()
783 if err = <-errs; err != nil {
788 if !dryRun && jobPath != "" {
789 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
790 ctx.LogE("rx-area-mkdir", les, err, logMsg)
793 if fd, err := os.Create(seenPath); err == nil {
795 if err = DirSync(seenDir); err != nil {
796 ctx.LogE("rx-area-dirsync", les, err, logMsg)
800 if err = os.Remove(jobPath); err != nil {
801 ctx.LogE("rx", les, err, func(les LEs) string {
803 "Tossing area %s/%s (%s): %s: removing",
804 sender.Name, pktName,
805 humanize.IBytes(pktSize),
810 } else if ctx.HdrUsage {
811 os.Remove(jobPath + HdrSuffix)
816 err = errors.New("unknown type")
818 "rx-type-unknown", les, err,
819 func(les LEs) string {
821 "Tossing %s/%s (%s)",
822 sender.Name, pktName, humanize.IBytes(pktSize),
831 func (ctx *Ctx) Toss(
835 dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
837 dirLock, err := ctx.LockDir(nodeId, "toss")
841 defer ctx.UnlockDir(dirLock)
843 decompressor, err := zstd.NewReader(nil)
847 defer decompressor.Close()
848 for job := range ctx.Jobs(nodeId, xx) {
849 pktName := filepath.Base(job.Path)
851 {"Node", job.PktEnc.Sender},
853 {"Nice", int(job.PktEnc.Nice)},
855 if job.PktEnc.Nice > nice {
856 ctx.LogD("rx-too-nice", les, func(les LEs) string {
858 "Tossing %s/%s: too nice: %s",
859 ctx.NodeName(job.PktEnc.Sender), pktName,
860 NicenessFmt(job.PktEnc.Nice),
865 fd, err := os.Open(job.Path)
867 ctx.LogE("rx-open", les, err, func(les LEs) string {
869 "Tossing %s/%s: opening %s",
870 ctx.NodeName(job.PktEnc.Sender), pktName, job.Path,
876 errs := make(chan error, 1)
879 pipeR, pipeW := io.Pipe()
886 ctx.Neigh[*job.PktEnc.Sender],
888 uint64(pktSizeWithoutEnc(job.Size)),
891 dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
894 pipeWB := bufio.NewWriter(pipeW)
895 sharedKey, _, _, err = PktEncRead(
904 pipeW.CloseWithError(err)
906 if err := pipeWB.Flush(); err != nil {
907 pipeW.CloseWithError(err)
914 go func() { <-errs }()
917 if err = <-errs; err == JobRepeatProcess {
918 if _, err = fd.Seek(0, io.SeekStart); err != nil {
919 ctx.LogE("rx-seek", les, err, func(les LEs) string {
921 "Tossing %s/%s: can not seek",
922 ctx.NodeName(job.PktEnc.Sender),
930 } else if err != nil {
938 func (ctx *Ctx) AutoToss(
941 doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
942 ) (chan struct{}, chan bool) {
943 finish := make(chan struct{})
944 badCode := make(chan bool)
954 time.Sleep(time.Second)
956 nodeId, TRx, nice, false,
957 doSeen, noFile, noFreq, noExec, noTrns, noArea) || bad
960 return finish, badCode