/*
NNCP -- Node to Node copy, utilities for store-and-forward data exchange
-Copyright (C) 2016-2019 Sergey Matveev <stargrave@stargrave.org>
+Copyright (C) 2016-2022 Sergey Matveev <stargrave@stargrave.org>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
import (
"bufio"
"bytes"
+ "encoding/base64"
+ "errors"
"fmt"
"io"
"io/ioutil"
"path/filepath"
"strconv"
"strings"
+ "time"
xdr "github.com/davecgh/go-xdr/xdr2"
"github.com/dustin/go-humanize"
)
const (
- SeenSuffix = ".seen"
+ SeenDir = "seen"
)
-func newNotification(fromTo *FromToJSON, subject string) io.Reader {
- return strings.NewReader(fmt.Sprintf(
- "From: %s\nTo: %s\nSubject: %s\n",
- fromTo.From,
- fromTo.To,
- mime.BEncoding.Encode("UTF-8", subject),
- ))
+func jobPath2Seen(jobPath string) string {
+ return filepath.Join(filepath.Dir(jobPath), SeenDir, filepath.Base(jobPath))
}
-func (ctx *Ctx) Toss(
- nodeId *NodeId,
+func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
+ lines := []string{
+ "From: " + fromTo.From,
+ "To: " + fromTo.To,
+ "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
+ }
+ if len(body) > 0 {
+ lines = append(
+ lines,
+ "MIME-Version: 1.0",
+ "Content-Type: text/plain; charset=utf-8",
+ "Content-Transfer-Encoding: base64",
+ "",
+ base64.StdEncoding.EncodeToString(body),
+ )
+ }
+ return strings.NewReader(strings.Join(lines, "\n"))
+}
+
+func pktSizeWithoutEnc(pktSize int64) int64 {
+ pktSize = pktSize - PktEncOverhead - PktOverhead - PktSizeOverhead
+ pktSizeBlocks := pktSize / (EncBlkSize + poly1305.TagSize)
+ if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
+ pktSize -= poly1305.TagSize
+ }
+ pktSize -= pktSizeBlocks * poly1305.TagSize
+ return pktSize
+}
+
+var JobRepeatProcess = errors.New("needs processing repeat")
+
+func jobProcess(
+ ctx *Ctx,
+ pipeR *io.PipeReader,
+ pktName string,
+ les LEs,
+ sender *Node,
nice uint8,
- dryRun, doSeen, noFile, noFreq, noExec, noTrns bool,
-) bool {
- isBad := false
- decompressor, err := zstd.NewReader(nil)
+ pktSize uint64,
+ jobPath string,
+ decompressor *zstd.Decoder,
+ dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
+) error {
+ defer pipeR.Close()
+ sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
+ var pkt Pkt
+ _, err := xdr.Unmarshal(pipeR, &pkt)
if err != nil {
- panic(err)
+ ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
+ return fmt.Sprintf("Tossing %s/%s: unmarshal", sender.Name, pktName)
+ })
+ return err
}
- defer decompressor.Close()
- for job := range ctx.Jobs(nodeId, TRx) {
- pktName := filepath.Base(job.Fd.Name())
- sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName}
- if job.PktEnc.Nice > nice {
- ctx.LogD("rx", SdsAdd(sds, SDS{
- "nice": strconv.Itoa(int(job.PktEnc.Nice)),
- }), "too nice")
- continue
+ les = append(les, LE{"Size", int64(pktSize)})
+ ctx.LogD("rx", les, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing %s/%s (%s)",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize),
+ )
+ })
+ switch pkt.Type {
+ case PktTypeExec, PktTypeExecFat:
+ if noExec {
+ return nil
}
- pipeR, pipeW := io.Pipe()
- errs := make(chan error, 1)
- go func(job Job) {
- pipeWB := bufio.NewWriter(pipeW)
- _, _, err := PktEncRead(
- ctx.Self,
- ctx.Neigh,
- bufio.NewReader(job.Fd),
- pipeWB,
+ path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
+ handle := string(path[0])
+ args := make([]string, 0, len(path)-1)
+ for _, p := range path[1:] {
+ args = append(args, string(p))
+ }
+ argsStr := strings.Join(append([]string{handle}, args...), " ")
+ les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr})
+ cmdline := sender.Exec[handle]
+ if len(cmdline) == 0 {
+ err = errors.New("No handle found")
+ ctx.LogE(
+ "rx-no-handle", les, err,
+ func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing exec %s/%s (%s): %s",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize), argsStr,
+ )
+ },
)
- errs <- err
- pipeWB.Flush()
- pipeW.Close()
- job.Fd.Close()
- if err != nil {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "decryption")
- }
- }(job)
- var pkt Pkt
- var err error
- var pktSize int64
- var pktSizeBlocks int64
- if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "unmarshal")
- isBad = true
- goto Closing
- }
- pktSize = job.Size - PktEncOverhead - PktOverhead - PktSizeOverhead
- pktSizeBlocks = pktSize / (EncBlkSize + poly1305.TagSize)
- if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
- pktSize -= poly1305.TagSize
- }
- pktSize -= pktSizeBlocks * poly1305.TagSize
- sds["size"] = strconv.FormatInt(pktSize, 10)
- ctx.LogD("rx", sds, "taken")
- switch pkt.Type {
- case PktTypeExec:
- if noExec {
- goto Closing
- }
- path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
- handle := string(path[0])
- args := make([]string, 0, len(path)-1)
- for _, p := range path[1:] {
- args = append(args, string(p))
- }
- sds := SdsAdd(sds, SDS{
- "type": "exec",
- "dst": strings.Join(append([]string{handle}, args...), " "),
- })
- sender := ctx.Neigh[*job.PktEnc.Sender]
- cmdline, exists := sender.Exec[handle]
- if !exists || len(cmdline) == 0 {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": "No handle found"}), "")
- isBad = true
- goto Closing
- }
+ return err
+ }
+ if pkt.Type == PktTypeExec {
if err = decompressor.Reset(pipeR); err != nil {
log.Fatalln(err)
}
- if !dryRun {
- cmd := exec.Command(
- cmdline[0],
- append(cmdline[1:len(cmdline)], args...)...,
- )
- cmd.Env = append(
- cmd.Env,
- "NNCP_SELF="+ctx.Self.Id.String(),
- "NNCP_SENDER="+sender.Id.String(),
- "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
- )
+ }
+ if !dryRun {
+ cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...)
+ cmd.Env = append(
+ cmd.Env,
+ "NNCP_SELF="+ctx.Self.Id.String(),
+ "NNCP_SENDER="+sender.Id.String(),
+ "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
+ )
+ if pkt.Type == PktTypeExec {
cmd.Stdin = decompressor
- if err = cmd.Run(); err != nil {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "handle")
- isBad = true
- goto Closing
- }
+ } else {
+ cmd.Stdin = pipeR
}
- ctx.LogI("rx", sds, "")
- if !dryRun {
- if doSeen {
- if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
- fd.Close()
+ output, err := cmd.CombinedOutput()
+ if err != nil {
+ les = append(les, LE{"Output", strings.Split(
+ strings.Trim(string(output), "\n"), "\n"),
+ })
+ ctx.LogE("rx-handle", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing exec %s/%s (%s): %s: handling",
+ sender.Name, pktName,
+ humanize.IBytes(uint64(pktSize)), argsStr,
+ )
+ })
+ return err
+ }
+ if len(sendmail) > 0 && ctx.NotifyExec != nil {
+ notify := ctx.NotifyExec[sender.Name+"."+handle]
+ if notify == nil {
+ notify = ctx.NotifyExec["*."+handle]
+ }
+ if notify != nil {
+ cmd := exec.Command(
+ sendmail[0],
+ append(sendmail[1:], notify.To)...,
+ )
+ cmd.Stdin = newNotification(notify, fmt.Sprintf(
+ "Exec from %s: %s", sender.Name, argsStr,
+ ), output)
+ if err = cmd.Run(); err != nil {
+ ctx.LogE("rx-notify", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing exec %s/%s (%s): %s: notifying",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize), argsStr,
+ )
+ })
}
}
- if err = os.Remove(job.Fd.Name()); err != nil {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
- isBad = true
+ }
+ }
+ ctx.LogI("rx", les, func(les LEs) string {
+ return fmt.Sprintf(
+ "Got exec from %s to %s (%s)",
+ sender.Name, argsStr,
+ humanize.IBytes(pktSize),
+ )
+ })
+ if !dryRun && jobPath != "" {
+ if doSeen {
+ if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
+ return err
+ }
+ if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
+ fd.Close()
+ if err = DirSync(filepath.Dir(jobPath)); err != nil {
+ ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: dirsyncing",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize),
+ filepath.Base(jobPath),
+ )
+ })
+ return err
+ }
}
}
- case PktTypeFile:
- if noFile {
- goto Closing
+ if err = os.Remove(jobPath); err != nil {
+ ctx.LogE("rx-notify", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing exec %s/%s (%s): %s: notifying",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize), argsStr,
+ )
+ })
+ return err
+ } else if ctx.HdrUsage {
+ os.Remove(JobPath2Hdr(jobPath))
}
- dst := string(pkt.Path[:int(pkt.PathLen)])
- sds := SdsAdd(sds, SDS{"type": "file", "dst": dst})
- if filepath.IsAbs(dst) {
- ctx.LogE("rx", sds, "non-relative destination path")
- isBad = true
- goto Closing
+ }
+
+ case PktTypeFile:
+ if noFile {
+ return nil
+ }
+ dst := string(pkt.Path[:int(pkt.PathLen)])
+ les = append(les, LE{"Type", "file"}, LE{"Dst", dst})
+ if filepath.IsAbs(dst) {
+ err = errors.New("non-relative destination path")
+ ctx.LogE(
+ "rx-non-rel", les, err,
+ func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize), dst,
+ )
+ },
+ )
+ return err
+ }
+ incoming := sender.Incoming
+ if incoming == nil {
+ err = errors.New("incoming is not allowed")
+ ctx.LogE(
+ "rx-no-incoming", les, err,
+ func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize), dst,
+ )
+ },
+ )
+ return err
+ }
+ dir := filepath.Join(*incoming, path.Dir(dst))
+ if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
+ ctx.LogE("rx-mkdir", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: mkdir",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize), dst,
+ )
+ })
+ return err
+ }
+ if !dryRun {
+ tmp, err := TempFile(dir, "file")
+ if err != nil {
+ ctx.LogE("rx-mktemp", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: mktemp",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize), dst,
+ )
+ })
+ return err
}
- incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
- if incoming == nil {
- ctx.LogE("rx", sds, "incoming is not allowed")
- isBad = true
- goto Closing
+ les = append(les, LE{"Tmp", tmp.Name()})
+ ctx.LogD("rx-tmp-created", les, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: created: %s",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize), dst, tmp.Name(),
+ )
+ })
+ bufW := bufio.NewWriter(tmp)
+ if _, err = CopyProgressed(
+ bufW, pipeR, "Rx file",
+ append(les, LE{"FullSize", int64(pktSize)}),
+ ctx.ShowPrgrs,
+ ); err != nil {
+ ctx.LogE("rx-copy", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: copying",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize), dst,
+ )
+ })
+ return err
}
- dir := filepath.Join(*incoming, path.Dir(dst))
- if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mkdir")
- isBad = true
- goto Closing
+ if err = bufW.Flush(); err != nil {
+ tmp.Close()
+ ctx.LogE("rx-flush", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: flushing",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize), dst,
+ )
+ })
+ return err
}
- if !dryRun {
- tmp, err := TempFile(dir, "file")
- if err != nil {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mktemp")
- isBad = true
- goto Closing
- }
- sds["tmp"] = tmp.Name()
- ctx.LogD("rx", sds, "created")
- bufW := bufio.NewWriter(tmp)
- if _, err = io.Copy(bufW, pipeR); err != nil {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
- isBad = true
- goto Closing
- }
- if err = bufW.Flush(); err != nil {
- tmp.Close()
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
- isBad = true
- goto Closing
- }
- if err = tmp.Sync(); err != nil {
- tmp.Close()
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
- isBad = true
- goto Closing
- }
+ if err = tmp.Sync(); err != nil {
tmp.Close()
- dstPathOrig := filepath.Join(*incoming, dst)
- dstPath := dstPathOrig
- dstPathCtr := 0
- for {
- if _, err = os.Stat(dstPath); err != nil {
- if os.IsNotExist(err) {
- break
+ ctx.LogE("rx-sync", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: syncing",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize), dst,
+ )
+ })
+ return err
+ }
+ if err = tmp.Close(); err != nil {
+ ctx.LogE("rx-close", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: closing",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize), dst,
+ )
+ })
+ return err
+ }
+ dstPathOrig := filepath.Join(*incoming, dst)
+ dstPath := dstPathOrig
+ dstPathCtr := 0
+ for {
+ if _, err = os.Stat(dstPath); err != nil {
+ if os.IsNotExist(err) {
+ break
+ }
+ ctx.LogE("rx-stat", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: stating: %s",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize), dst, dstPath,
+ )
+ })
+ return err
+ }
+ dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
+ dstPathCtr++
+ }
+ if err = os.Rename(tmp.Name(), dstPath); err != nil {
+ ctx.LogE("rx-rename", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: renaming",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize), dst,
+ )
+ })
+ return err
+ }
+ if err = DirSync(*incoming); err != nil {
+ ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: dirsyncing",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize), dst,
+ )
+ })
+ return err
+ }
+ les = les[:len(les)-1] // delete Tmp
+ }
+ ctx.LogI("rx", les, func(les LEs) string {
+ return fmt.Sprintf(
+ "Got file %s (%s) from %s",
+ dst, humanize.IBytes(pktSize), sender.Name,
+ )
+ })
+ if !dryRun {
+ if jobPath != "" {
+ if doSeen {
+ if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
+ return err
+ }
+ if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
+ fd.Close()
+ if err = DirSync(filepath.Dir(jobPath)); err != nil {
+ ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: dirsyncing",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize),
+ filepath.Base(jobPath),
+ )
+ })
+ return err
}
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "stat")
- isBad = true
- goto Closing
}
- dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
- dstPathCtr++
}
- if err = os.Rename(tmp.Name(), dstPath); err != nil {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "rename")
- isBad = true
+ if err = os.Remove(jobPath); err != nil {
+ ctx.LogE("rx-remove", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: removing",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize), dst,
+ )
+ })
+ return err
+ } else if ctx.HdrUsage {
+ os.Remove(JobPath2Hdr(jobPath))
+ }
+ }
+ if len(sendmail) > 0 && ctx.NotifyFile != nil {
+ cmd := exec.Command(
+ sendmail[0],
+ append(sendmail[1:], ctx.NotifyFile.To)...,
+ )
+ cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
+ "File from %s: %s (%s)",
+ sender.Name, dst, humanize.IBytes(pktSize),
+ ), nil)
+ if err = cmd.Run(); err != nil {
+ ctx.LogE("rx-notify", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: notifying",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize), dst,
+ )
+ })
}
- delete(sds, "tmp")
}
- ctx.LogI("rx", sds, "")
- if !dryRun {
+ }
+
+ case PktTypeFreq:
+ if noFreq {
+ return nil
+ }
+ src := string(pkt.Path[:int(pkt.PathLen)])
+ les := append(les, LE{"Type", "freq"}, LE{"Src", src})
+ if filepath.IsAbs(src) {
+ err = errors.New("non-relative source path")
+ ctx.LogE(
+ "rx-non-rel", les, err,
+ func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing freq %s/%s (%s): %s: notifying",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize), src,
+ )
+ },
+ )
+ return err
+ }
+ dstRaw, err := ioutil.ReadAll(pipeR)
+ if err != nil {
+ ctx.LogE("rx-read", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing freq %s/%s (%s): %s: reading",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize), src,
+ )
+ })
+ return err
+ }
+ dst := string(dstRaw)
+ les = append(les, LE{"Dst", dst})
+ freqPath := sender.FreqPath
+ if freqPath == nil {
+ err = errors.New("freqing is not allowed")
+ ctx.LogE(
+ "rx-no-freq", les, err,
+ func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing freq %s/%s (%s): %s -> %s",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize), src, dst,
+ )
+ },
+ )
+ return err
+ }
+ if !dryRun {
+ err = ctx.TxFile(
+ sender,
+ pkt.Nice,
+ filepath.Join(*freqPath, src),
+ dst,
+ sender.FreqChunked,
+ sender.FreqMinSize,
+ sender.FreqMaxSize,
+ nil,
+ )
+ if err != nil {
+ ctx.LogE("rx-tx", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing freq %s/%s (%s): %s -> %s: txing",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize), src, dst,
+ )
+ })
+ return err
+ }
+ }
+ ctx.LogI("rx", les, func(les LEs) string {
+ return fmt.Sprintf("Got file request %s to %s", src, sender.Name)
+ })
+ if !dryRun {
+ if jobPath != "" {
if doSeen {
- if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
+ if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
+ return err
+ }
+ if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
fd.Close()
+ if err = DirSync(filepath.Dir(jobPath)); err != nil {
+ ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: dirsyncing",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize),
+ filepath.Base(jobPath),
+ )
+ })
+ return err
+ }
}
}
- if err = os.Remove(job.Fd.Name()); err != nil {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
- isBad = true
+ if err = os.Remove(jobPath); err != nil {
+ ctx.LogE("rx-remove", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing freq %s/%s (%s): %s -> %s: removing",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize), src, dst,
+ )
+ })
+ return err
+ } else if ctx.HdrUsage {
+ os.Remove(JobPath2Hdr(jobPath))
}
- sendmail, exists := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
- if exists && len(sendmail) > 0 && ctx.NotifyFile != nil {
- cmd := exec.Command(
- sendmail[0],
- append(sendmail[1:len(sendmail)], ctx.NotifyFile.To)...,
- )
- cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
- "File from %s: %s (%s)",
- ctx.Neigh[*job.PktEnc.Sender].Name,
- dst,
- humanize.IBytes(uint64(pktSize)),
- ))
- cmd.Run()
+ }
+ if len(sendmail) > 0 && ctx.NotifyFreq != nil {
+ cmd := exec.Command(
+ sendmail[0],
+ append(sendmail[1:], ctx.NotifyFreq.To)...,
+ )
+ cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
+ "Freq from %s: %s", sender.Name, src,
+ ), nil)
+ if err = cmd.Run(); err != nil {
+ ctx.LogE("rx-notify", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing freq %s/%s (%s): %s -> %s: notifying",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize), src, dst,
+ )
+ })
}
}
- case PktTypeFreq:
- if noFreq {
- goto Closing
+ }
+
+ case PktTypeTrns:
+ if noTrns {
+ return nil
+ }
+ dst := new([MTHSize]byte)
+ copy(dst[:], pkt.Path[:int(pkt.PathLen)])
+ nodeId := NodeId(*dst)
+ les := append(les, LE{"Type", "trns"}, LE{"Dst", nodeId})
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing trns %s/%s (%s): %s",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize),
+ nodeId.String(),
+ )
+ }
+ node := ctx.Neigh[nodeId]
+ if node == nil {
+ err = errors.New("unknown node")
+ ctx.LogE("rx-unknown", les, err, logMsg)
+ return err
+ }
+ ctx.LogD("rx-tx", les, logMsg)
+ if !dryRun {
+ if len(node.Via) == 0 {
+ if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil {
+ ctx.LogE("rx", les, err, func(les LEs) string {
+ return logMsg(les) + ": txing"
+ })
+ return err
+ }
+ } else {
+ via := node.Via[:len(node.Via)-1]
+ node = ctx.Neigh[*node.Via[len(node.Via)-1]]
+ node = &Node{Id: node.Id, Via: via, ExchPub: node.ExchPub}
+ pktTrns, err := NewPkt(PktTypeTrns, 0, nodeId[:])
+ if err != nil {
+ panic(err)
+ }
+ if _, _, err = ctx.Tx(
+ node,
+ pktTrns,
+ nice,
+ int64(pktSize), 0, MaxFileSize,
+ pipeR,
+ pktName,
+ nil,
+ ); err != nil {
+ ctx.LogE("rx", les, err, func(les LEs) string {
+ return logMsg(les) + ": txing"
+ })
+ return err
+ }
}
- src := string(pkt.Path[:int(pkt.PathLen)])
- if filepath.IsAbs(src) {
- ctx.LogE("rx", sds, "non-relative source path")
- isBad = true
- goto Closing
+ }
+ ctx.LogI("rx", les, func(les LEs) string {
+ return fmt.Sprintf(
+ "Got transitional packet from %s to %s (%s)",
+ sender.Name,
+ ctx.NodeName(&nodeId),
+ humanize.IBytes(pktSize),
+ )
+ })
+ if !dryRun && jobPath != "" {
+ if doSeen {
+ if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
+ return err
+ }
+ if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
+ fd.Close()
+ if err = DirSync(filepath.Dir(jobPath)); err != nil {
+ ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: dirsyncing",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize),
+ filepath.Base(jobPath),
+ )
+ })
+ return err
+ }
+ }
}
- sds := SdsAdd(sds, SDS{"type": "freq", "src": src})
- dstRaw, err := ioutil.ReadAll(pipeR)
- if err != nil {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "read")
- isBad = true
- goto Closing
- }
- dst := string(dstRaw)
- sds["dst"] = dst
- sender := ctx.Neigh[*job.PktEnc.Sender]
- freqPath := sender.FreqPath
- if freqPath == nil {
- ctx.LogE("rx", sds, "freqing is not allowed")
- isBad = true
- goto Closing
- }
- if !dryRun {
- err = ctx.TxFile(
- sender,
- pkt.Nice,
- filepath.Join(*freqPath, src),
- dst,
- sender.FreqChunked,
- sender.FreqMinSize,
- sender.FreqMaxSize,
+ if err = os.Remove(jobPath); err != nil {
+ ctx.LogE("rx", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing trns %s/%s (%s): %s: removing",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize),
+ ctx.NodeName(&nodeId),
+ )
+ })
+ return err
+ } else if ctx.HdrUsage {
+ os.Remove(JobPath2Hdr(jobPath))
+ }
+ }
+
+ case PktTypeArea:
+ if noArea {
+ return nil
+ }
+ areaId := new(AreaId)
+ copy(areaId[:], pkt.Path[:int(pkt.PathLen)])
+ les := append(les, LE{"Type", "area"}, LE{"Area", areaId})
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing %s/%s (%s): area %s",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize),
+ ctx.AreaName(areaId),
+ )
+ }
+ area := ctx.AreaId2Area[*areaId]
+ if area == nil {
+ err = errors.New("unknown area")
+ ctx.LogE("rx-area-unknown", les, err, logMsg)
+ return err
+ }
+ pktEnc, pktEncRaw, err := ctx.HdrRead(pipeR)
+ fullPipeR := io.MultiReader(bytes.NewReader(pktEncRaw), pipeR)
+ if err != nil {
+ ctx.LogE("rx-area-pkt-enc-read", les, err, logMsg)
+ return err
+ }
+ msgHashRaw := blake2b.Sum256(pktEncRaw)
+ msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
+ les = append(les, LE{"AreaMsg", msgHash})
+ ctx.LogD("rx-area", les, logMsg)
+
+ if dryRun {
+ for _, nodeId := range area.Subs {
+ node := ctx.Neigh[*nodeId]
+ lesEcho := append(les, LE{"Echo", nodeId})
+ seenDir := filepath.Join(
+ ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
)
- if err != nil {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx file")
- isBad = true
- goto Closing
+ seenPath := filepath.Join(seenDir, msgHash)
+ logMsgNode := func(les LEs) string {
+ return fmt.Sprintf(
+ "%s: echoing to: %s", logMsg(les), node.Name,
+ )
+ }
+ if _, err := os.Stat(seenPath); err == nil {
+ ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
+ return logMsgNode(les) + ": already sent"
+ })
+ continue
}
+ ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
}
- ctx.LogI("rx", sds, "")
- if !dryRun {
- if doSeen {
- if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
- fd.Close()
+ } else {
+ for _, nodeId := range area.Subs {
+ node := ctx.Neigh[*nodeId]
+ lesEcho := append(les, LE{"Echo", nodeId})
+ seenDir := filepath.Join(
+ ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
+ )
+ seenPath := filepath.Join(seenDir, msgHash)
+ logMsgNode := func(les LEs) string {
+ return fmt.Sprintf("%s: echo to: %s", logMsg(les), node.Name)
+ }
+ if _, err := os.Stat(seenPath); err == nil {
+ ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
+ return logMsgNode(les) + ": already sent"
+ })
+ continue
+ }
+ if nodeId != sender.Id && nodeId != pktEnc.Sender {
+ ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
+ if _, _, err = ctx.Tx(
+ node,
+ &pkt,
+ nice,
+ int64(pktSize), 0, MaxFileSize,
+ fullPipeR,
+ pktName,
+ nil,
+ ); err != nil {
+ ctx.LogE("rx-area", lesEcho, err, logMsgNode)
+ return err
}
}
- if err = os.Remove(job.Fd.Name()); err != nil {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
- isBad = true
+ if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
+ ctx.LogE("rx-area-mkdir", lesEcho, err, logMsgNode)
+ return err
}
- sendmail, exists := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
- if exists && len(sendmail) > 0 && ctx.NotifyFreq != nil {
- cmd := exec.Command(
- sendmail[0],
- append(sendmail[1:len(sendmail)], ctx.NotifyFreq.To)...,
- )
- cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
- "Freq from %s: %s",
- ctx.Neigh[*job.PktEnc.Sender].Name,
- src,
- ))
- cmd.Run()
+ if fd, err := os.Create(seenPath); err == nil {
+ fd.Close()
+ if err = DirSync(seenDir); err != nil {
+ ctx.LogE("rx-area-dirsync", les, err, logMsgNode)
+ return err
+ }
+ } else {
+ ctx.LogE("rx-area-touch", lesEcho, err, logMsgNode)
+ return err
}
+ return JobRepeatProcess
}
- case PktTypeTrns:
- if noTrns {
- goto Closing
- }
- dst := new([blake2b.Size256]byte)
- copy(dst[:], pkt.Path[:int(pkt.PathLen)])
- nodeId := NodeId(*dst)
- node, known := ctx.Neigh[nodeId]
- sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId})
- if !known {
- ctx.LogE("rx", sds, "unknown node")
- isBad = true
- goto Closing
- }
- ctx.LogD("rx", sds, "taken")
- if !dryRun {
- if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx trns")
- isBad = true
- goto Closing
+ }
+
+ seenDir := filepath.Join(
+ ctx.Spool, ctx.SelfId.String(), AreaDir, area.Id.String(),
+ )
+ seenPath := filepath.Join(seenDir, msgHash)
+ if _, err := os.Stat(seenPath); err == nil {
+ ctx.LogD("rx-area-seen", les, func(les LEs) string {
+ return logMsg(les) + ": already seen"
+ })
+ if !dryRun && jobPath != "" {
+ if err = os.Remove(jobPath); err != nil {
+ ctx.LogE("rx-area-remove", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing area %s/%s (%s): %s: removing",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize),
+ msgHash,
+ )
+ })
+ return err
+ } else if ctx.HdrUsage {
+ os.Remove(JobPath2Hdr(jobPath))
}
}
- ctx.LogI("rx", sds, "")
- if !dryRun {
- if doSeen {
- if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
- fd.Close()
- }
+ return nil
+ }
+
+ if area.Prv == nil {
+ ctx.LogD("rx-area-no-prv", les, func(les LEs) string {
+ return logMsg(les) + ": no private key for decoding"
+ })
+ } else {
+ signatureVerify := true
+ if _, senderKnown := ctx.Neigh[*pktEnc.Sender]; !senderKnown {
+ if !area.AllowUnknown {
+ err = errors.New("unknown sender")
+ ctx.LogE(
+ "rx-area-unknown",
+ append(les, LE{"Sender", pktEnc.Sender}),
+ err,
+ func(les LEs) string {
+ return logMsg(les) + ": sender: " + pktEnc.Sender.String()
+ },
+ )
+ return err
}
- if err = os.Remove(job.Fd.Name()); err != nil {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
- isBad = true
+ signatureVerify = false
+ }
+ areaNodeOur := NodeOur{Id: new(NodeId), ExchPrv: new([32]byte)}
+ copy(areaNodeOur.Id[:], area.Id[:])
+ copy(areaNodeOur.ExchPrv[:], area.Prv[:])
+ areaNode := Node{
+ Id: new(NodeId),
+ Name: area.Name,
+ Incoming: area.Incoming,
+ Exec: area.Exec,
+ }
+ copy(areaNode.Id[:], area.Id[:])
+ pktName := fmt.Sprintf(
+ "area/%s/%s",
+ Base32Codec.EncodeToString(areaId[:]), msgHash,
+ )
+
+ pipeR, pipeW := io.Pipe()
+ errs := make(chan error, 1)
+ go func() {
+ errs <- jobProcess(
+ ctx,
+ pipeR,
+ pktName,
+ les,
+ &areaNode,
+ nice,
+ uint64(pktSizeWithoutEnc(int64(pktSize))),
+ "",
+ decompressor,
+ dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
+ )
+ }()
+ _, _, _, err = PktEncRead(
+ &areaNodeOur,
+ ctx.Neigh,
+ fullPipeR,
+ pipeW,
+ signatureVerify,
+ nil,
+ )
+ if err != nil {
+ ctx.LogE("rx-area-pkt-enc-read2", les, err, logMsg)
+ pipeW.CloseWithError(err)
+ <-errs
+ return err
+ }
+ pipeW.Close()
+ if err = <-errs; err != nil {
+ return err
+ }
+ }
+
+ if !dryRun && jobPath != "" {
+ if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
+ ctx.LogE("rx-area-mkdir", les, err, logMsg)
+ return err
+ }
+ if fd, err := os.Create(seenPath); err == nil {
+ fd.Close()
+ if err = DirSync(seenDir); err != nil {
+ ctx.LogE("rx-area-dirsync", les, err, logMsg)
+ return err
}
}
- default:
- ctx.LogE("rx", sds, "unknown type")
+ if err = os.Remove(jobPath); err != nil {
+ ctx.LogE("rx", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing area %s/%s (%s): %s: removing",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize),
+ msgHash,
+ )
+ })
+ return err
+ } else if ctx.HdrUsage {
+ os.Remove(JobPath2Hdr(jobPath))
+ }
+ }
+
+ default:
+ err = errors.New("unknown type")
+ ctx.LogE(
+ "rx-type-unknown", les, err,
+ func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing %s/%s (%s)",
+ sender.Name, pktName, humanize.IBytes(pktSize),
+ )
+ },
+ )
+ return err
+ }
+ return nil
+}
+
+func (ctx *Ctx) Toss(
+ nodeId *NodeId,
+ xx TRxTx,
+ nice uint8,
+ dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
+) bool {
+ dirLock, err := ctx.LockDir(nodeId, "toss")
+ if err != nil {
+ return false
+ }
+ defer ctx.UnlockDir(dirLock)
+ isBad := false
+ decompressor, err := zstd.NewReader(nil)
+ if err != nil {
+ panic(err)
+ }
+ defer decompressor.Close()
+ for job := range ctx.Jobs(nodeId, xx) {
+ pktName := filepath.Base(job.Path)
+ les := LEs{
+ {"Node", job.PktEnc.Sender},
+ {"Pkt", pktName},
+ {"Nice", int(job.PktEnc.Nice)},
+ }
+ if job.PktEnc.Nice > nice {
+ ctx.LogD("rx-too-nice", les, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing %s/%s: too nice: %s",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ NicenessFmt(job.PktEnc.Nice),
+ )
+ })
+ continue
+ }
+ fd, err := os.Open(job.Path)
+ if err != nil {
+ ctx.LogE("rx-open", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing %s/%s: opening %s",
+ ctx.NodeName(job.PktEnc.Sender), pktName, job.Path,
+ )
+ })
+ isBad = true
+ continue
+ }
+ sender := ctx.Neigh[*job.PktEnc.Sender]
+ if sender == nil {
+ err := errors.New("unknown node")
+ ctx.LogE("rx-open", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing %s/%s",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ )
+ })
+ isBad = true
+ continue
+ }
+ errs := make(chan error, 1)
+ var sharedKey []byte
+ Retry:
+ pipeR, pipeW := io.Pipe()
+ go func() {
+ errs <- jobProcess(
+ ctx,
+ pipeR,
+ pktName,
+ les,
+ sender,
+ job.PktEnc.Nice,
+ uint64(pktSizeWithoutEnc(job.Size)),
+ job.Path,
+ decompressor,
+ dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
+ )
+ }()
+ pipeWB := bufio.NewWriter(pipeW)
+ sharedKey, _, _, err = PktEncRead(
+ ctx.Self,
+ ctx.Neigh,
+ bufio.NewReader(fd),
+ pipeWB,
+ sharedKey == nil,
+ sharedKey,
+ )
+ if err != nil {
+ pipeW.CloseWithError(err)
+ }
+ if err := pipeWB.Flush(); err != nil {
+ pipeW.CloseWithError(err)
+ }
+ pipeW.Close()
+
+ if err != nil {
+ isBad = true
+ fd.Close()
+ <-errs
+ continue
+ }
+ if err = <-errs; err == JobRepeatProcess {
+ if _, err = fd.Seek(0, io.SeekStart); err != nil {
+ ctx.LogE("rx-seek", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing %s/%s: can not seek",
+ ctx.NodeName(job.PktEnc.Sender),
+ pktName,
+ )
+ })
+ isBad = true
+ break
+ }
+ goto Retry
+ } else if err != nil {
isBad = true
}
- Closing:
- pipeR.Close()
+ fd.Close()
}
return isBad
}
+
+func (ctx *Ctx) AutoToss(
+ nodeId *NodeId,
+ nice uint8,
+ doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
+) (chan struct{}, chan bool) {
+ dw, err := ctx.NewDirWatcher(
+ filepath.Join(ctx.Spool, nodeId.String(), string(TRx)),
+ time.Second,
+ )
+ if err != nil {
+ log.Fatalln(err)
+ }
+ finish := make(chan struct{})
+ badCode := make(chan bool)
+ go func() {
+ bad := false
+ for {
+ select {
+ case <-finish:
+ dw.Close()
+ badCode <- bad
+ return
+ case <-dw.C:
+ bad = !ctx.Toss(
+ nodeId, TRx, nice, false,
+ doSeen, noFile, noFreq, noExec, noTrns, noArea) || bad
+ }
+ }
+ }()
+ return finish, badCode
+}