/*
NNCP -- Node to Node copy, utilities for store-and-forward data exchange
-Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
+Copyright (C) 2016-2023 Sergey Matveev <stargrave@stargrave.org>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
"errors"
"fmt"
"io"
- "io/ioutil"
+ "io/fs"
"log"
"mime"
"os"
)
const (
- SeenSuffix = ".seen"
+ SeenDir = "seen"
+ ACKDir = "ack"
)
+type TossOpts struct {
+ Nice uint8
+ DryRun bool
+ DoSeen bool
+ NoFile bool
+ NoFreq bool
+ NoExec bool
+ NoTrns bool
+ NoArea bool
+ NoACK bool
+ GenACK bool
+}
+
+func jobPath2Seen(jobPath string) string {
+ return filepath.Join(filepath.Dir(jobPath), SeenDir, filepath.Base(jobPath))
+}
+
func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
lines := []string{
"From: " + fromTo.From,
pktSize uint64,
jobPath string,
decompressor *zstd.Decoder,
- dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
+ opts *TossOpts,
) error {
defer pipeR.Close()
sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
humanize.IBytes(pktSize),
)
})
+ if opts.GenACK && pkt.Type != PktTypeACK {
+ newPktName, err := ctx.TxACK(
+ sender, sender.ACKNice, pktName, sender.ACKMinSize,
+ )
+ if err != nil {
+ ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
+ return fmt.Sprintf("Tossing %s/%s: generating ACK", sender.Name, pktName)
+ })
+ return err
+ }
+ ackDir := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), ACKDir)
+ os.MkdirAll(ackDir, os.FileMode(0777))
+ if fd, err := os.Create(filepath.Join(ackDir, newPktName)); err == nil {
+ fd.Close()
+ if err = DirSync(ackDir); err != nil {
+ ctx.LogE("rx-genack", les, err, func(les LEs) string {
+ return fmt.Sprintf("Tossing %s/%s: genACK", sender.Name, pktName)
+ })
+ return err
+ }
+ } else {
+ ctx.LogE("rx-genack", les, err, func(les LEs) string {
+ return fmt.Sprintf("Tossing %s/%s: genACK", sender.Name, pktName)
+ })
+ return err
+ }
+ }
switch pkt.Type {
case PktTypeExec, PktTypeExecFat:
- if noExec {
+ if opts.NoExec {
return nil
}
path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
log.Fatalln(err)
}
}
- if !dryRun {
+ if !opts.DryRun {
cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...)
cmd.Env = append(
cmd.Env,
} else {
cmd.Stdin = pipeR
}
- output, err := cmd.Output()
+ output, err := cmd.CombinedOutput()
if err != nil {
- ctx.LogE("rx-hande", les, err, func(les LEs) string {
+ les = append(les, LE{"Output", strings.Split(
+ strings.Trim(string(output), "\n"), "\n"),
+ })
+ ctx.LogE("rx-handle", les, err, func(les LEs) string {
return fmt.Sprintf(
"Tossing exec %s/%s (%s): %s: handling",
sender.Name, pktName,
humanize.IBytes(pktSize),
)
})
- if !dryRun && jobPath != "" {
- if doSeen {
- if fd, err := os.Create(jobPath + SeenSuffix); err == nil {
+ if !opts.DryRun && jobPath != "" {
+ if opts.DoSeen {
+ if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
+ return err
+ }
+ if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
fd.Close()
if err = DirSync(filepath.Dir(jobPath)); err != nil {
ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
})
return err
} else if ctx.HdrUsage {
- os.Remove(jobPath + HdrSuffix)
+ os.Remove(JobPath2Hdr(jobPath))
}
}
case PktTypeFile:
- if noFile {
+ if opts.NoFile {
return nil
}
dst := string(pkt.Path[:int(pkt.PathLen)])
})
return err
}
- if !dryRun {
+ if !opts.DryRun {
tmp, err := TempFile(dir, "file")
if err != nil {
ctx.LogE("rx-mktemp", les, err, func(les LEs) string {
})
return err
}
- if err = tmp.Sync(); err != nil {
- tmp.Close()
- ctx.LogE("rx-sync", les, err, func(les LEs) string {
- return fmt.Sprintf(
- "Tossing file %s/%s (%s): %s: syncing",
- sender.Name, pktName,
- humanize.IBytes(pktSize), dst,
- )
- })
- return err
+ if !NoSync {
+ if err = tmp.Sync(); err != nil {
+ tmp.Close()
+ ctx.LogE("rx-sync", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: syncing",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize), dst,
+ )
+ })
+ return err
+ }
}
if err = tmp.Close(); err != nil {
ctx.LogE("rx-close", les, err, func(les LEs) string {
dstPathCtr := 0
for {
if _, err = os.Stat(dstPath); err != nil {
- if os.IsNotExist(err) {
+ if errors.Is(err, fs.ErrNotExist) {
break
}
ctx.LogE("rx-stat", les, err, func(les LEs) string {
dst, humanize.IBytes(pktSize), sender.Name,
)
})
- if !dryRun {
+ if !opts.DryRun {
if jobPath != "" {
- if doSeen {
- if fd, err := os.Create(jobPath + SeenSuffix); err == nil {
+ if opts.DoSeen {
+ if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
+ return err
+ }
+ if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
fd.Close()
if err = DirSync(filepath.Dir(jobPath)); err != nil {
ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
})
return err
} else if ctx.HdrUsage {
- os.Remove(jobPath + HdrSuffix)
+ os.Remove(JobPath2Hdr(jobPath))
}
}
if len(sendmail) > 0 && ctx.NotifyFile != nil {
}
case PktTypeFreq:
- if noFreq {
+ if opts.NoFreq {
return nil
}
src := string(pkt.Path[:int(pkt.PathLen)])
)
return err
}
- dstRaw, err := ioutil.ReadAll(pipeR)
+ dstRaw, err := io.ReadAll(pipeR)
if err != nil {
ctx.LogE("rx-read", les, err, func(les LEs) string {
return fmt.Sprintf(
)
return err
}
- if !dryRun {
+ if !opts.DryRun {
err = ctx.TxFile(
sender,
pkt.Nice,
ctx.LogI("rx", les, func(les LEs) string {
return fmt.Sprintf("Got file request %s to %s", src, sender.Name)
})
- if !dryRun {
+ if !opts.DryRun {
if jobPath != "" {
- if doSeen {
- if fd, err := os.Create(jobPath + SeenSuffix); err == nil {
+ if opts.DoSeen {
+ if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
+ return err
+ }
+ if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
fd.Close()
if err = DirSync(filepath.Dir(jobPath)); err != nil {
ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
})
return err
} else if ctx.HdrUsage {
- os.Remove(jobPath + HdrSuffix)
+ os.Remove(JobPath2Hdr(jobPath))
}
}
if len(sendmail) > 0 && ctx.NotifyFreq != nil {
}
case PktTypeTrns:
- if noTrns {
+ if opts.NoTrns {
return nil
}
dst := new([MTHSize]byte)
return err
}
ctx.LogD("rx-tx", les, logMsg)
- if !dryRun {
+ if !opts.DryRun {
if len(node.Via) == 0 {
if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil {
ctx.LogE("rx", les, err, func(les LEs) string {
if err != nil {
panic(err)
}
- if _, err = ctx.Tx(
+ if _, _, _, err = ctx.Tx(
node,
pktTrns,
nice,
- int64(pktSize), 0,
+ int64(pktSize), 0, MaxFileSize,
pipeR,
pktName,
nil,
humanize.IBytes(pktSize),
)
})
- if !dryRun && jobPath != "" {
- if doSeen {
- if fd, err := os.Create(jobPath + SeenSuffix); err == nil {
+ if !opts.DryRun && jobPath != "" {
+ if opts.DoSeen {
+ if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
+ return err
+ }
+ if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
fd.Close()
if err = DirSync(filepath.Dir(jobPath)); err != nil {
ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
})
return err
} else if ctx.HdrUsage {
- os.Remove(jobPath + HdrSuffix)
+ os.Remove(JobPath2Hdr(jobPath))
}
}
case PktTypeArea:
- if noArea {
+ if opts.NoArea {
return nil
}
areaId := new(AreaId)
les = append(les, LE{"AreaMsg", msgHash})
ctx.LogD("rx-area", les, logMsg)
- if dryRun {
+ if opts.DryRun {
for _, nodeId := range area.Subs {
node := ctx.Neigh[*nodeId]
lesEcho := append(les, LE{"Echo", nodeId})
seenDir := filepath.Join(
ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
)
- seenPath := filepath.Join(seenDir, msgHash+SeenSuffix)
+ seenPath := filepath.Join(seenDir, msgHash)
logMsgNode := func(les LEs) string {
return fmt.Sprintf(
"%s: echoing to: %s", logMsg(les), node.Name,
seenDir := filepath.Join(
ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
)
- seenPath := filepath.Join(seenDir, msgHash+SeenSuffix)
+ seenPath := filepath.Join(seenDir, msgHash)
logMsgNode := func(les LEs) string {
return fmt.Sprintf("%s: echo to: %s", logMsg(les), node.Name)
}
}
if nodeId != sender.Id && nodeId != pktEnc.Sender {
ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
- if _, err = ctx.Tx(
- node, &pkt, nice, int64(pktSize), 0, fullPipeR, pktName, nil,
+ if _, _, _, err = ctx.Tx(
+ node,
+ &pkt,
+ nice,
+ int64(pktSize), 0, MaxFileSize,
+ fullPipeR,
+ pktName,
+ nil,
); err != nil {
ctx.LogE("rx-area", lesEcho, err, logMsgNode)
return err
seenDir := filepath.Join(
ctx.Spool, ctx.SelfId.String(), AreaDir, area.Id.String(),
)
- seenPath := filepath.Join(seenDir, msgHash+SeenSuffix)
+ seenPath := filepath.Join(seenDir, msgHash)
if _, err := os.Stat(seenPath); err == nil {
ctx.LogD("rx-area-seen", les, func(les LEs) string {
return logMsg(les) + ": already seen"
})
- if !dryRun && jobPath != "" {
+ if !opts.DryRun && jobPath != "" {
if err = os.Remove(jobPath); err != nil {
ctx.LogE("rx-area-remove", les, err, func(les LEs) string {
return fmt.Sprintf(
})
return err
} else if ctx.HdrUsage {
- os.Remove(jobPath + HdrSuffix)
+ os.Remove(JobPath2Hdr(jobPath))
}
}
return nil
uint64(pktSizeWithoutEnc(int64(pktSize))),
"",
decompressor,
- dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
+ opts,
)
}()
_, _, _, err = PktEncRead(
nil,
)
if err != nil {
+ ctx.LogE("rx-area-pkt-enc-read2", les, err, logMsg)
pipeW.CloseWithError(err)
<-errs
return err
}
}
- if !dryRun && jobPath != "" {
+ if !opts.DryRun && jobPath != "" {
if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
ctx.LogE("rx-area-mkdir", les, err, logMsg)
return err
})
return err
} else if ctx.HdrUsage {
- os.Remove(jobPath + HdrSuffix)
+ os.Remove(JobPath2Hdr(jobPath))
}
}
+ case PktTypeACK:
+ if opts.NoACK {
+ return nil
+ }
+ hsh := Base32Codec.EncodeToString(pkt.Path[:MTHSize])
+ les := append(les, LE{"Type", "ack"}, LE{"Pkt", hsh})
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf("Tossing ack %s/%s: %s", sender.Name, pktName, hsh)
+ }
+ ctx.LogD("rx-ack", les, logMsg)
+ pktPath := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), hsh)
+ if _, err := os.Stat(pktPath); err == nil {
+ if !opts.DryRun {
+ if err = os.Remove(pktPath); err != nil {
+ ctx.LogE("rx-ack", les, err, func(les LEs) string {
+ return logMsg(les) + ": removing packet"
+ })
+ return err
+ } else if ctx.HdrUsage {
+ os.Remove(JobPath2Hdr(pktPath))
+ }
+ }
+ } else {
+ ctx.LogD("rx-ack", les, func(les LEs) string {
+ return logMsg(les) + ": already disappeared"
+ })
+ }
+ if !opts.DryRun && opts.DoSeen {
+ if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
+ return err
+ }
+ if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
+ fd.Close()
+ if err = DirSync(filepath.Dir(jobPath)); err != nil {
+ ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: dirsyncing",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize),
+ filepath.Base(jobPath),
+ )
+ })
+ return err
+ }
+ }
+ }
+ if !opts.DryRun {
+ if err = os.Remove(jobPath); err != nil {
+ ctx.LogE("rx", les, err, func(les LEs) string {
+ return logMsg(les) + ": removing job"
+ })
+ return err
+ } else if ctx.HdrUsage {
+ os.Remove(JobPath2Hdr(jobPath))
+ }
+ }
+ ctx.LogI("rx", les, func(les LEs) string {
+ return fmt.Sprintf("Got ACK packet from %s of %s", sender.Name, hsh)
+ })
+
default:
err = errors.New("unknown type")
ctx.LogE(
return nil
}
-func (ctx *Ctx) Toss(
- nodeId *NodeId,
- xx TRxTx,
- nice uint8,
- dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
-) bool {
+func (ctx *Ctx) Toss(nodeId *NodeId, xx TRxTx, opts *TossOpts) bool {
dirLock, err := ctx.LockDir(nodeId, "toss")
if err != nil {
return false
{"Pkt", pktName},
{"Nice", int(job.PktEnc.Nice)},
}
- if job.PktEnc.Nice > nice {
+ if job.PktEnc.Nice > opts.Nice {
ctx.LogD("rx-too-nice", les, func(les LEs) string {
return fmt.Sprintf(
"Tossing %s/%s: too nice: %s",
uint64(pktSizeWithoutEnc(job.Size)),
job.Path,
decompressor,
- dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
+ opts,
)
}()
pipeWB := bufio.NewWriter(pipeW)
sharedKey, _, _, err = PktEncRead(
ctx.Self,
ctx.Neigh,
- bufio.NewReader(fd),
+ bufio.NewReaderSize(fd, MTHBlockSize),
pipeWB,
sharedKey == nil,
sharedKey,
return isBad
}
-func (ctx *Ctx) AutoToss(
- nodeId *NodeId,
- nice uint8,
- doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
-) (chan struct{}, chan bool) {
+func (ctx *Ctx) AutoToss(nodeId *NodeId, opts *TossOpts) (chan struct{}, chan bool) {
dw, err := ctx.NewDirWatcher(
filepath.Join(ctx.Spool, nodeId.String(), string(TRx)),
time.Second,
badCode <- bad
return
case <-dw.C:
- bad = !ctx.Toss(
- nodeId, TRx, nice, false,
- doSeen, noFile, noFreq, noExec, noTrns, noArea) || bad
+ bad = !ctx.Toss(nodeId, TRx, opts) || bad
}
}
}()