/*
NNCP -- Node to Node copy, utilities for store-and-forward data exchange
-Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
+Copyright (C) 2016-2022 Sergey Matveev <stargrave@stargrave.org>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
pktSize uint64,
jobPath string,
decompressor *zstd.Decoder,
- dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
+ dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool,
) error {
defer pipeR.Close()
sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
} else {
cmd.Stdin = pipeR
}
- output, err := cmd.Output()
+ output, err := cmd.CombinedOutput()
if err != nil {
- ctx.LogE("rx-hande", les, err, func(les LEs) string {
+ les = append(les, LE{"Output", strings.Split(
+ strings.Trim(string(output), "\n"), "\n"),
+ })
+ ctx.LogE("rx-handle", les, err, func(les LEs) string {
return fmt.Sprintf(
"Tossing exec %s/%s (%s): %s: handling",
sender.Name, pktName,
})
return err
}
- if err = tmp.Sync(); err != nil {
- tmp.Close()
- ctx.LogE("rx-sync", les, err, func(les LEs) string {
- return fmt.Sprintf(
- "Tossing file %s/%s (%s): %s: syncing",
- sender.Name, pktName,
- humanize.IBytes(pktSize), dst,
- )
- })
- return err
+ if !NoSync {
+ if err = tmp.Sync(); err != nil {
+ tmp.Close()
+ ctx.LogE("rx-sync", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: syncing",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize), dst,
+ )
+ })
+ return err
+ }
}
if err = tmp.Close(); err != nil {
ctx.LogE("rx-close", les, err, func(les LEs) string {
if err != nil {
panic(err)
}
- if _, err = ctx.Tx(
+ if _, _, _, err = ctx.Tx(
node,
pktTrns,
nice,
- int64(pktSize), 0,
+ int64(pktSize), 0, MaxFileSize,
pipeR,
pktName,
nil,
}
if nodeId != sender.Id && nodeId != pktEnc.Sender {
ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
- if _, err = ctx.Tx(
- node, &pkt, nice, int64(pktSize), 0, fullPipeR, pktName, nil,
+ if _, _, _, err = ctx.Tx(
+ node,
+ &pkt,
+ nice,
+ int64(pktSize), 0, MaxFileSize,
+ fullPipeR,
+ pktName,
+ nil,
); err != nil {
ctx.LogE("rx-area", lesEcho, err, logMsgNode)
return err
uint64(pktSizeWithoutEnc(int64(pktSize))),
"",
decompressor,
- dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
+ dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK,
)
}()
_, _, _, err = PktEncRead(
nil,
)
if err != nil {
+ ctx.LogE("rx-area-pkt-enc-read2", les, err, logMsg)
pipeW.CloseWithError(err)
<-errs
return err
}
}
+ case PktTypeACK:
+ if noACK {
+ return nil
+ }
+ hsh := Base32Codec.EncodeToString(pkt.Path[:MTHSize])
+ les := append(les, LE{"Type", "ack"}, LE{"Pkt", hsh})
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf("Tossing ack %s/%s: %s", sender.Name, pktName, hsh)
+ }
+ ctx.LogD("rx-ack", les, logMsg)
+ pktPath := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), hsh)
+ if _, err := os.Stat(pktPath); err == nil {
+ if !dryRun {
+ if err = os.Remove(pktPath); err != nil {
+ ctx.LogE("rx-ack", les, err, func(les LEs) string {
+ return logMsg(les) + ": removing packet"
+ })
+ return err
+ } else if ctx.HdrUsage {
+ os.Remove(JobPath2Hdr(pktPath))
+ }
+ }
+ } else {
+ ctx.LogD("rx-ack", les, func(les LEs) string {
+ return logMsg(les) + ": already disappeared"
+ })
+ }
+ if !dryRun && doSeen {
+ if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
+ return err
+ }
+ if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
+ fd.Close()
+ if err = DirSync(filepath.Dir(jobPath)); err != nil {
+ ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: dirsyncing",
+ sender.Name, pktName,
+ humanize.IBytes(pktSize),
+ filepath.Base(jobPath),
+ )
+ })
+ return err
+ }
+ }
+ }
+ if !dryRun {
+ if err = os.Remove(jobPath); err != nil {
+ ctx.LogE("rx", les, err, func(les LEs) string {
+ return logMsg(les) + ": removing job"
+ })
+ return err
+ } else if ctx.HdrUsage {
+ os.Remove(JobPath2Hdr(jobPath))
+ }
+ }
+ ctx.LogI("rx", les, func(les LEs) string {
+ return fmt.Sprintf("Got ACK packet from %s of %s", sender.Name, hsh)
+ })
+
default:
err = errors.New("unknown type")
ctx.LogE(
nodeId *NodeId,
xx TRxTx,
nice uint8,
- dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
+ dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool,
) bool {
dirLock, err := ctx.LockDir(nodeId, "toss")
if err != nil {
uint64(pktSizeWithoutEnc(job.Size)),
job.Path,
decompressor,
- dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
+ dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK,
)
}()
pipeWB := bufio.NewWriter(pipeW)
sharedKey, _, _, err = PktEncRead(
ctx.Self,
ctx.Neigh,
- bufio.NewReader(fd),
+ bufio.NewReaderSize(fd, MTHBlockSize),
pipeWB,
sharedKey == nil,
sharedKey,
func (ctx *Ctx) AutoToss(
nodeId *NodeId,
nice uint8,
- doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
+ doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool,
) (chan struct{}, chan bool) {
dw, err := ctx.NewDirWatcher(
filepath.Join(ctx.Spool, nodeId.String(), string(TRx)),
case <-dw.C:
bad = !ctx.Toss(
nodeId, TRx, nice, false,
- doSeen, noFile, noFreq, noExec, noTrns, noArea) || bad
+ doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK) || bad
}
}
}()