/*
NNCP -- Node to Node copy, utilities for store-and-forward data exchange
-Copyright (C) 2016-2019 Sergey Matveev <stargrave@stargrave.org>
+Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
import (
"bufio"
"bytes"
+ "encoding/base64"
+ "errors"
"fmt"
"io"
"io/ioutil"
"path/filepath"
"strconv"
"strings"
+ "time"
xdr "github.com/davecgh/go-xdr/xdr2"
"github.com/dustin/go-humanize"
SeenSuffix = ".seen"
)
-func newNotification(fromTo *FromToJSON, subject string) io.Reader {
- return strings.NewReader(fmt.Sprintf(
- "From: %s\nTo: %s\nSubject: %s\n",
- fromTo.From,
- fromTo.To,
- mime.BEncoding.Encode("UTF-8", subject),
- ))
+func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
+ lines := []string{
+ "From: " + fromTo.From,
+ "To: " + fromTo.To,
+ "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
+ }
+ if len(body) > 0 {
+ lines = append(lines, []string{
+ "MIME-Version: 1.0",
+ "Content-Type: text/plain; charset=utf-8",
+ "Content-Transfer-Encoding: base64",
+ "",
+ base64.StdEncoding.EncodeToString(body),
+ }...)
+ }
+ return strings.NewReader(strings.Join(lines, "\n"))
}
func (ctx *Ctx) Toss(
nice uint8,
dryRun, doSeen, noFile, noFreq, noExec, noTrns bool,
) bool {
+ dirLock, err := ctx.LockDir(nodeId, "toss")
+ if err != nil {
+ ctx.LogE("rx", LEs{}, err, "lock")
+ return false
+ }
+ defer ctx.UnlockDir(dirLock)
isBad := false
+ sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
decompressor, err := zstd.NewReader(nil)
if err != nil {
panic(err)
defer decompressor.Close()
for job := range ctx.Jobs(nodeId, TRx) {
pktName := filepath.Base(job.Fd.Name())
- sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName}
+ les := LEs{{"Node", job.PktEnc.Sender}, {"Pkt", pktName}}
if job.PktEnc.Nice > nice {
- ctx.LogD("rx", SdsAdd(sds, SDS{
- "nice": strconv.Itoa(int(job.PktEnc.Nice)),
- }), "too nice")
+ ctx.LogD("rx", append(les, LE{"Nice", int(job.PktEnc.Nice)}), "too nice")
+ job.Fd.Close() // #nosec G104
continue
}
pipeR, pipeW := io.Pipe()
- errs := make(chan error, 1)
- go func(job Job) {
+ go func(job Job) error {
pipeWB := bufio.NewWriter(pipeW)
_, _, err := PktEncRead(
ctx.Self,
bufio.NewReader(job.Fd),
pipeWB,
)
- errs <- err
- pipeWB.Flush()
- pipeW.Close()
- job.Fd.Close()
+ job.Fd.Close() // #nosec G104
if err != nil {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "decryption")
+ return pipeW.CloseWithError(err)
+ }
+ if err = pipeWB.Flush(); err != nil {
+ return pipeW.CloseWithError(err)
}
+ return pipeW.Close()
}(job)
var pkt Pkt
var err error
var pktSize int64
var pktSizeBlocks int64
if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "unmarshal")
+ ctx.LogE("rx", les, err, "unmarshal")
isBad = true
goto Closing
}
pktSize -= poly1305.TagSize
}
pktSize -= pktSizeBlocks * poly1305.TagSize
- sds["size"] = strconv.FormatInt(pktSize, 10)
- ctx.LogD("rx", sds, "taken")
+ les = append(les, LE{"Size", pktSize})
+ ctx.LogD("rx", les, "taken")
switch pkt.Type {
- case PktTypeExec:
+ case PktTypeExec, PktTypeExecFat:
if noExec {
goto Closing
}
for _, p := range path[1:] {
args = append(args, string(p))
}
- sds := SdsAdd(sds, SDS{
- "type": "exec",
- "dst": strings.Join(append([]string{handle}, args...), " "),
- })
+ argsStr := strings.Join(append([]string{handle}, args...), " ")
+ les = append(les, LEs{
+ {"Type", "exec"},
+ {"Dst", argsStr},
+ }...)
sender := ctx.Neigh[*job.PktEnc.Sender]
cmdline, exists := sender.Exec[handle]
if !exists || len(cmdline) == 0 {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": "No handle found"}), "")
+ ctx.LogE("rx", les, errors.New("No handle found"), "")
isBad = true
goto Closing
}
- if err = decompressor.Reset(pipeR); err != nil {
- log.Fatalln(err)
+ if pkt.Type == PktTypeExec {
+ if err = decompressor.Reset(pipeR); err != nil {
+ log.Fatalln(err)
+ }
}
if !dryRun {
cmd := exec.Command(
cmdline[0],
- append(cmdline[1:len(cmdline)], args...)...,
+ append(cmdline[1:], args...)...,
)
cmd.Env = append(
cmd.Env,
"NNCP_SENDER="+sender.Id.String(),
"NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
)
- cmd.Stdin = decompressor
- if err = cmd.Run(); err != nil {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "handle")
+ if pkt.Type == PktTypeExec {
+ cmd.Stdin = decompressor
+ } else {
+ cmd.Stdin = pipeR
+ }
+ output, err := cmd.Output()
+ if err != nil {
+ ctx.LogE("rx", les, err, "handle")
isBad = true
goto Closing
}
+ if len(sendmail) > 0 && ctx.NotifyExec != nil {
+ notify, exists := ctx.NotifyExec[sender.Name+"."+handle]
+ if !exists {
+ notify, exists = ctx.NotifyExec["*."+handle]
+ }
+ if exists {
+ cmd := exec.Command(
+ sendmail[0],
+ append(sendmail[1:], notify.To)...,
+ )
+ cmd.Stdin = newNotification(notify, fmt.Sprintf(
+ "Exec from %s: %s", sender.Name, argsStr,
+ ), output)
+ if err = cmd.Run(); err != nil {
+ ctx.LogE("rx", les, err, "notify")
+ }
+ }
+ }
}
- ctx.LogI("rx", sds, "")
+ ctx.LogI("rx", les, "")
if !dryRun {
if doSeen {
if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
- fd.Close()
+ fd.Close() // #nosec G104
}
}
if err = os.Remove(job.Fd.Name()); err != nil {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
+ ctx.LogE("rx", les, err, "remove")
isBad = true
}
}
goto Closing
}
dst := string(pkt.Path[:int(pkt.PathLen)])
- sds := SdsAdd(sds, SDS{"type": "file", "dst": dst})
+ les = append(les, LEs{{"Type", "file"}, {"Dst", dst}}...)
if filepath.IsAbs(dst) {
- ctx.LogE("rx", sds, "non-relative destination path")
+ ctx.LogE("rx", les, errors.New("non-relative destination path"), "")
isBad = true
goto Closing
}
incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
if incoming == nil {
- ctx.LogE("rx", sds, "incoming is not allowed")
+ ctx.LogE("rx", les, errors.New("incoming is not allowed"), "")
isBad = true
goto Closing
}
dir := filepath.Join(*incoming, path.Dir(dst))
if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mkdir")
+ ctx.LogE("rx", les, err, "mkdir")
isBad = true
goto Closing
}
if !dryRun {
tmp, err := TempFile(dir, "file")
if err != nil {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mktemp")
+ ctx.LogE("rx", les, err, "mktemp")
isBad = true
goto Closing
}
- sds["tmp"] = tmp.Name()
- ctx.LogD("rx", sds, "created")
+ les = append(les, LE{"Tmp", tmp.Name()})
+ ctx.LogD("rx", les, "created")
bufW := bufio.NewWriter(tmp)
- if _, err = io.Copy(bufW, pipeR); err != nil {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
+ if _, err = CopyProgressed(
+ bufW, pipeR, "Rx file",
+ append(les, LE{"FullSize", pktSize}),
+ ctx.ShowPrgrs,
+ ); err != nil {
+ ctx.LogE("rx", les, err, "copy")
isBad = true
goto Closing
}
if err = bufW.Flush(); err != nil {
- tmp.Close()
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
+ tmp.Close() // #nosec G104
+ ctx.LogE("rx", les, err, "copy")
isBad = true
goto Closing
}
if err = tmp.Sync(); err != nil {
- tmp.Close()
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
+ tmp.Close() // #nosec G104
+ ctx.LogE("rx", les, err, "copy")
+ isBad = true
+ goto Closing
+ }
+ if err = tmp.Close(); err != nil {
+ ctx.LogE("rx", les, err, "copy")
isBad = true
goto Closing
}
- tmp.Close()
dstPathOrig := filepath.Join(*incoming, dst)
dstPath := dstPathOrig
dstPathCtr := 0
if os.IsNotExist(err) {
break
}
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "stat")
+ ctx.LogE("rx", les, err, "stat")
isBad = true
goto Closing
}
dstPathCtr++
}
if err = os.Rename(tmp.Name(), dstPath); err != nil {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "rename")
+ ctx.LogE("rx", les, err, "rename")
+ isBad = true
+ }
+ if err = DirSync(*incoming); err != nil {
+ ctx.LogE("rx", les, err, "sync")
isBad = true
}
- delete(sds, "tmp")
+ les = les[:len(les)-1] // delete Tmp
}
- ctx.LogI("rx", sds, "")
+ ctx.LogI("rx", les, "")
if !dryRun {
if doSeen {
if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
- fd.Close()
+ fd.Close() // #nosec G104
}
}
if err = os.Remove(job.Fd.Name()); err != nil {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
+ ctx.LogE("rx", les, err, "remove")
isBad = true
}
- sendmail, exists := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
- if exists && len(sendmail) > 0 && ctx.NotifyFile != nil {
+ if len(sendmail) > 0 && ctx.NotifyFile != nil {
cmd := exec.Command(
sendmail[0],
- append(sendmail[1:len(sendmail)], ctx.NotifyFile.To)...,
+ append(sendmail[1:], ctx.NotifyFile.To)...,
)
cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
"File from %s: %s (%s)",
ctx.Neigh[*job.PktEnc.Sender].Name,
dst,
humanize.IBytes(uint64(pktSize)),
- ))
- cmd.Run()
+ ), nil)
+ if err = cmd.Run(); err != nil {
+ ctx.LogE("rx", les, err, "notify")
+ }
}
}
case PktTypeFreq:
}
src := string(pkt.Path[:int(pkt.PathLen)])
if filepath.IsAbs(src) {
- ctx.LogE("rx", sds, "non-relative source path")
+ ctx.LogE("rx", les, errors.New("non-relative source path"), "")
isBad = true
goto Closing
}
- sds := SdsAdd(sds, SDS{"type": "freq", "src": src})
+ les := append(les, LEs{{"Type", "freq"}, {"Src", src}}...)
dstRaw, err := ioutil.ReadAll(pipeR)
if err != nil {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "read")
+ ctx.LogE("rx", les, err, "read")
isBad = true
goto Closing
}
dst := string(dstRaw)
- sds["dst"] = dst
+ les = append(les, LE{"Dst", dst})
sender := ctx.Neigh[*job.PktEnc.Sender]
- freq := sender.Freq
- if freq == nil {
- ctx.LogE("rx", sds, "freqing is not allowed")
+ freqPath := sender.FreqPath
+ if freqPath == nil {
+ ctx.LogE("rx", les, errors.New("freqing is not allowed"), "")
isBad = true
goto Closing
}
if !dryRun {
- if sender.FreqChunked == 0 {
- err = ctx.TxFile(
- sender,
- pkt.Nice,
- filepath.Join(*freq, src),
- dst,
- sender.FreqMinSize,
- )
- } else {
- err = ctx.TxFileChunked(
- sender,
- pkt.Nice,
- filepath.Join(*freq, src),
- dst,
- sender.FreqMinSize,
- sender.FreqChunked,
- )
- }
+ err = ctx.TxFile(
+ sender,
+ pkt.Nice,
+ filepath.Join(*freqPath, src),
+ dst,
+ sender.FreqChunked,
+ sender.FreqMinSize,
+ sender.FreqMaxSize,
+ )
if err != nil {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx file")
+ ctx.LogE("rx", les, err, "tx file")
isBad = true
goto Closing
}
}
- ctx.LogI("rx", sds, "")
+ ctx.LogI("rx", les, "")
if !dryRun {
if doSeen {
if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
- fd.Close()
+ fd.Close() // #nosec G104
}
}
if err = os.Remove(job.Fd.Name()); err != nil {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
+ ctx.LogE("rx", les, err, "remove")
isBad = true
}
- sendmail, exists := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
- if exists && len(sendmail) > 0 && ctx.NotifyFreq != nil {
+ if len(sendmail) > 0 && ctx.NotifyFreq != nil {
cmd := exec.Command(
sendmail[0],
- append(sendmail[1:len(sendmail)], ctx.NotifyFreq.To)...,
+ append(sendmail[1:], ctx.NotifyFreq.To)...,
)
cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
- "Freq from %s: %s",
- ctx.Neigh[*job.PktEnc.Sender].Name,
- src,
- ))
- cmd.Run()
+ "Freq from %s: %s", sender.Name, src,
+ ), nil)
+ if err = cmd.Run(); err != nil {
+ ctx.LogE("rx", les, err, "notify")
+ }
}
}
case PktTypeTrns:
copy(dst[:], pkt.Path[:int(pkt.PathLen)])
nodeId := NodeId(*dst)
node, known := ctx.Neigh[nodeId]
- sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId})
+ les := append(les, LEs{{"Type", "trns"}, {"Dst", nodeId}}...)
if !known {
- ctx.LogE("rx", sds, "unknown node")
+ ctx.LogE("rx", les, errors.New("unknown node"), "")
isBad = true
goto Closing
}
- ctx.LogD("rx", sds, "taken")
+ ctx.LogD("rx", les, "taken")
if !dryRun {
if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx trns")
+ ctx.LogE("rx", les, err, "tx trns")
isBad = true
goto Closing
}
}
- ctx.LogI("rx", sds, "")
+ ctx.LogI("rx", les, "")
if !dryRun {
if doSeen {
if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
- fd.Close()
+ fd.Close() // #nosec G104
}
}
if err = os.Remove(job.Fd.Name()); err != nil {
- ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
+ ctx.LogE("rx", les, err, "remove")
isBad = true
}
}
default:
- ctx.LogE("rx", sds, "unknown type")
+ ctx.LogE("rx", les, errors.New("unknown type"), "")
isBad = true
}
Closing:
- pipeR.Close()
+ pipeR.Close() // #nosec G104
}
return isBad
}
+
+func (ctx *Ctx) AutoToss(
+ nodeId *NodeId,
+ nice uint8,
+ doSeen, noFile, noFreq, noExec, noTrns bool,
+) (chan struct{}, chan bool) {
+ finish := make(chan struct{})
+ badCode := make(chan bool)
+ go func() {
+ bad := false
+ for {
+ select {
+ case <-finish:
+ badCode <- bad
+ break
+ default:
+ }
+ time.Sleep(time.Second)
+ bad = !ctx.Toss(nodeId, nice, false, doSeen, noFile, noFreq, noExec, noTrns)
+ }
+ }()
+ return finish, badCode
+}