2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2022 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
38 xdr "github.com/davecgh/go-xdr/xdr2"
39 "github.com/dustin/go-humanize"
40 "github.com/klauspost/compress/zstd"
41 "golang.org/x/crypto/blake2b"
42 "golang.org/x/crypto/poly1305"
49 func jobPath2Seen(jobPath string) string {
50 return filepath.Join(filepath.Dir(jobPath), SeenDir, filepath.Base(jobPath))
53 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
55 "From: " + fromTo.From,
57 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
63 "Content-Type: text/plain; charset=utf-8",
64 "Content-Transfer-Encoding: base64",
66 base64.StdEncoding.EncodeToString(body),
69 return strings.NewReader(strings.Join(lines, "\n"))
72 func pktSizeWithoutEnc(pktSize int64) int64 {
73 pktSize = pktSize - PktEncOverhead - PktOverhead - PktSizeOverhead
74 pktSizeBlocks := pktSize / (EncBlkSize + poly1305.TagSize)
75 if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
76 pktSize -= poly1305.TagSize
78 pktSize -= pktSizeBlocks * poly1305.TagSize
82 var JobRepeatProcess = errors.New("needs processing repeat")
93 decompressor *zstd.Decoder,
94 dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
97 sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
99 _, err := xdr.Unmarshal(pipeR, &pkt)
101 ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
102 return fmt.Sprintf("Tossing %s/%s: unmarshal", sender.Name, pktName)
106 les = append(les, LE{"Size", int64(pktSize)})
107 ctx.LogD("rx", les, func(les LEs) string {
109 "Tossing %s/%s (%s)",
110 sender.Name, pktName,
111 humanize.IBytes(pktSize),
115 case PktTypeExec, PktTypeExecFat:
119 path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
120 handle := string(path[0])
121 args := make([]string, 0, len(path)-1)
122 for _, p := range path[1:] {
123 args = append(args, string(p))
125 argsStr := strings.Join(append([]string{handle}, args...), " ")
126 les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr})
127 cmdline := sender.Exec[handle]
128 if len(cmdline) == 0 {
129 err = errors.New("No handle found")
131 "rx-no-handle", les, err,
132 func(les LEs) string {
134 "Tossing exec %s/%s (%s): %s",
135 sender.Name, pktName,
136 humanize.IBytes(pktSize), argsStr,
142 if pkt.Type == PktTypeExec {
143 if err = decompressor.Reset(pipeR); err != nil {
148 cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...)
151 "NNCP_SELF="+ctx.Self.Id.String(),
152 "NNCP_SENDER="+sender.Id.String(),
153 "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
155 if pkt.Type == PktTypeExec {
156 cmd.Stdin = decompressor
160 output, err := cmd.CombinedOutput()
162 les = append(les, LE{"Output", strings.Split(
163 strings.Trim(string(output), "\n"), "\n"),
165 ctx.LogE("rx-handle", les, err, func(les LEs) string {
167 "Tossing exec %s/%s (%s): %s: handling",
168 sender.Name, pktName,
169 humanize.IBytes(uint64(pktSize)), argsStr,
174 if len(sendmail) > 0 && ctx.NotifyExec != nil {
175 notify := ctx.NotifyExec[sender.Name+"."+handle]
177 notify = ctx.NotifyExec["*."+handle]
182 append(sendmail[1:], notify.To)...,
184 cmd.Stdin = newNotification(notify, fmt.Sprintf(
185 "Exec from %s: %s", sender.Name, argsStr,
187 if err = cmd.Run(); err != nil {
188 ctx.LogE("rx-notify", les, err, func(les LEs) string {
190 "Tossing exec %s/%s (%s): %s: notifying",
191 sender.Name, pktName,
192 humanize.IBytes(pktSize), argsStr,
199 ctx.LogI("rx", les, func(les LEs) string {
201 "Got exec from %s to %s (%s)",
202 sender.Name, argsStr,
203 humanize.IBytes(pktSize),
206 if !dryRun && jobPath != "" {
208 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
211 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
213 if err = DirSync(filepath.Dir(jobPath)); err != nil {
214 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
216 "Tossing file %s/%s (%s): %s: dirsyncing",
217 sender.Name, pktName,
218 humanize.IBytes(pktSize),
219 filepath.Base(jobPath),
226 if err = os.Remove(jobPath); err != nil {
227 ctx.LogE("rx-notify", les, err, func(les LEs) string {
229 "Tossing exec %s/%s (%s): %s: notifying",
230 sender.Name, pktName,
231 humanize.IBytes(pktSize), argsStr,
235 } else if ctx.HdrUsage {
236 os.Remove(JobPath2Hdr(jobPath))
244 dst := string(pkt.Path[:int(pkt.PathLen)])
245 les = append(les, LE{"Type", "file"}, LE{"Dst", dst})
246 if filepath.IsAbs(dst) {
247 err = errors.New("non-relative destination path")
249 "rx-non-rel", les, err,
250 func(les LEs) string {
252 "Tossing file %s/%s (%s): %s",
253 sender.Name, pktName,
254 humanize.IBytes(pktSize), dst,
260 incoming := sender.Incoming
262 err = errors.New("incoming is not allowed")
264 "rx-no-incoming", les, err,
265 func(les LEs) string {
267 "Tossing file %s/%s (%s): %s",
268 sender.Name, pktName,
269 humanize.IBytes(pktSize), dst,
275 dir := filepath.Join(*incoming, path.Dir(dst))
276 if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
277 ctx.LogE("rx-mkdir", les, err, func(les LEs) string {
279 "Tossing file %s/%s (%s): %s: mkdir",
280 sender.Name, pktName,
281 humanize.IBytes(pktSize), dst,
287 tmp, err := TempFile(dir, "file")
289 ctx.LogE("rx-mktemp", les, err, func(les LEs) string {
291 "Tossing file %s/%s (%s): %s: mktemp",
292 sender.Name, pktName,
293 humanize.IBytes(pktSize), dst,
298 les = append(les, LE{"Tmp", tmp.Name()})
299 ctx.LogD("rx-tmp-created", les, func(les LEs) string {
301 "Tossing file %s/%s (%s): %s: created: %s",
302 sender.Name, pktName,
303 humanize.IBytes(pktSize), dst, tmp.Name(),
306 bufW := bufio.NewWriter(tmp)
307 if _, err = CopyProgressed(
308 bufW, pipeR, "Rx file",
309 append(les, LE{"FullSize", int64(pktSize)}),
312 ctx.LogE("rx-copy", les, err, func(les LEs) string {
314 "Tossing file %s/%s (%s): %s: copying",
315 sender.Name, pktName,
316 humanize.IBytes(pktSize), dst,
321 if err = bufW.Flush(); err != nil {
323 ctx.LogE("rx-flush", les, err, func(les LEs) string {
325 "Tossing file %s/%s (%s): %s: flushing",
326 sender.Name, pktName,
327 humanize.IBytes(pktSize), dst,
332 if err = tmp.Sync(); err != nil {
334 ctx.LogE("rx-sync", les, err, func(les LEs) string {
336 "Tossing file %s/%s (%s): %s: syncing",
337 sender.Name, pktName,
338 humanize.IBytes(pktSize), dst,
343 if err = tmp.Close(); err != nil {
344 ctx.LogE("rx-close", les, err, func(les LEs) string {
346 "Tossing file %s/%s (%s): %s: closing",
347 sender.Name, pktName,
348 humanize.IBytes(pktSize), dst,
353 dstPathOrig := filepath.Join(*incoming, dst)
354 dstPath := dstPathOrig
357 if _, err = os.Stat(dstPath); err != nil {
358 if os.IsNotExist(err) {
361 ctx.LogE("rx-stat", les, err, func(les LEs) string {
363 "Tossing file %s/%s (%s): %s: stating: %s",
364 sender.Name, pktName,
365 humanize.IBytes(pktSize), dst, dstPath,
370 dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
373 if err = os.Rename(tmp.Name(), dstPath); err != nil {
374 ctx.LogE("rx-rename", les, err, func(les LEs) string {
376 "Tossing file %s/%s (%s): %s: renaming",
377 sender.Name, pktName,
378 humanize.IBytes(pktSize), dst,
383 if err = DirSync(*incoming); err != nil {
384 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
386 "Tossing file %s/%s (%s): %s: dirsyncing",
387 sender.Name, pktName,
388 humanize.IBytes(pktSize), dst,
393 les = les[:len(les)-1] // delete Tmp
395 ctx.LogI("rx", les, func(les LEs) string {
397 "Got file %s (%s) from %s",
398 dst, humanize.IBytes(pktSize), sender.Name,
404 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
407 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
409 if err = DirSync(filepath.Dir(jobPath)); err != nil {
410 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
412 "Tossing file %s/%s (%s): %s: dirsyncing",
413 sender.Name, pktName,
414 humanize.IBytes(pktSize),
415 filepath.Base(jobPath),
422 if err = os.Remove(jobPath); err != nil {
423 ctx.LogE("rx-remove", les, err, func(les LEs) string {
425 "Tossing file %s/%s (%s): %s: removing",
426 sender.Name, pktName,
427 humanize.IBytes(pktSize), dst,
431 } else if ctx.HdrUsage {
432 os.Remove(JobPath2Hdr(jobPath))
435 if len(sendmail) > 0 && ctx.NotifyFile != nil {
438 append(sendmail[1:], ctx.NotifyFile.To)...,
440 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
441 "File from %s: %s (%s)",
442 sender.Name, dst, humanize.IBytes(pktSize),
444 if err = cmd.Run(); err != nil {
445 ctx.LogE("rx-notify", les, err, func(les LEs) string {
447 "Tossing file %s/%s (%s): %s: notifying",
448 sender.Name, pktName,
449 humanize.IBytes(pktSize), dst,
460 src := string(pkt.Path[:int(pkt.PathLen)])
461 les := append(les, LE{"Type", "freq"}, LE{"Src", src})
462 if filepath.IsAbs(src) {
463 err = errors.New("non-relative source path")
465 "rx-non-rel", les, err,
466 func(les LEs) string {
468 "Tossing freq %s/%s (%s): %s: notifying",
469 sender.Name, pktName,
470 humanize.IBytes(pktSize), src,
476 dstRaw, err := ioutil.ReadAll(pipeR)
478 ctx.LogE("rx-read", les, err, func(les LEs) string {
480 "Tossing freq %s/%s (%s): %s: reading",
481 sender.Name, pktName,
482 humanize.IBytes(pktSize), src,
487 dst := string(dstRaw)
488 les = append(les, LE{"Dst", dst})
489 freqPath := sender.FreqPath
491 err = errors.New("freqing is not allowed")
493 "rx-no-freq", les, err,
494 func(les LEs) string {
496 "Tossing freq %s/%s (%s): %s -> %s",
497 sender.Name, pktName,
498 humanize.IBytes(pktSize), src, dst,
508 filepath.Join(*freqPath, src),
516 ctx.LogE("rx-tx", les, err, func(les LEs) string {
518 "Tossing freq %s/%s (%s): %s -> %s: txing",
519 sender.Name, pktName,
520 humanize.IBytes(pktSize), src, dst,
526 ctx.LogI("rx", les, func(les LEs) string {
527 return fmt.Sprintf("Got file request %s to %s", src, sender.Name)
532 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
535 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
537 if err = DirSync(filepath.Dir(jobPath)); err != nil {
538 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
540 "Tossing file %s/%s (%s): %s: dirsyncing",
541 sender.Name, pktName,
542 humanize.IBytes(pktSize),
543 filepath.Base(jobPath),
550 if err = os.Remove(jobPath); err != nil {
551 ctx.LogE("rx-remove", les, err, func(les LEs) string {
553 "Tossing freq %s/%s (%s): %s -> %s: removing",
554 sender.Name, pktName,
555 humanize.IBytes(pktSize), src, dst,
559 } else if ctx.HdrUsage {
560 os.Remove(JobPath2Hdr(jobPath))
563 if len(sendmail) > 0 && ctx.NotifyFreq != nil {
566 append(sendmail[1:], ctx.NotifyFreq.To)...,
568 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
569 "Freq from %s: %s", sender.Name, src,
571 if err = cmd.Run(); err != nil {
572 ctx.LogE("rx-notify", les, err, func(les LEs) string {
574 "Tossing freq %s/%s (%s): %s -> %s: notifying",
575 sender.Name, pktName,
576 humanize.IBytes(pktSize), src, dst,
587 dst := new([MTHSize]byte)
588 copy(dst[:], pkt.Path[:int(pkt.PathLen)])
589 nodeId := NodeId(*dst)
590 les := append(les, LE{"Type", "trns"}, LE{"Dst", nodeId})
591 logMsg := func(les LEs) string {
593 "Tossing trns %s/%s (%s): %s",
594 sender.Name, pktName,
595 humanize.IBytes(pktSize),
599 node := ctx.Neigh[nodeId]
601 err = errors.New("unknown node")
602 ctx.LogE("rx-unknown", les, err, logMsg)
605 ctx.LogD("rx-tx", les, logMsg)
607 if len(node.Via) == 0 {
608 if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil {
609 ctx.LogE("rx", les, err, func(les LEs) string {
610 return logMsg(les) + ": txing"
615 via := node.Via[:len(node.Via)-1]
616 node = ctx.Neigh[*node.Via[len(node.Via)-1]]
617 node = &Node{Id: node.Id, Via: via, ExchPub: node.ExchPub}
618 pktTrns, err := NewPkt(PktTypeTrns, 0, nodeId[:])
622 if _, _, err = ctx.Tx(
626 int64(pktSize), 0, MaxFileSize,
631 ctx.LogE("rx", les, err, func(les LEs) string {
632 return logMsg(les) + ": txing"
638 ctx.LogI("rx", les, func(les LEs) string {
640 "Got transitional packet from %s to %s (%s)",
642 ctx.NodeName(&nodeId),
643 humanize.IBytes(pktSize),
646 if !dryRun && jobPath != "" {
648 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
651 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
653 if err = DirSync(filepath.Dir(jobPath)); err != nil {
654 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
656 "Tossing file %s/%s (%s): %s: dirsyncing",
657 sender.Name, pktName,
658 humanize.IBytes(pktSize),
659 filepath.Base(jobPath),
666 if err = os.Remove(jobPath); err != nil {
667 ctx.LogE("rx", les, err, func(les LEs) string {
669 "Tossing trns %s/%s (%s): %s: removing",
670 sender.Name, pktName,
671 humanize.IBytes(pktSize),
672 ctx.NodeName(&nodeId),
676 } else if ctx.HdrUsage {
677 os.Remove(JobPath2Hdr(jobPath))
685 areaId := new(AreaId)
686 copy(areaId[:], pkt.Path[:int(pkt.PathLen)])
687 les := append(les, LE{"Type", "area"}, LE{"Area", areaId})
688 logMsg := func(les LEs) string {
690 "Tossing %s/%s (%s): area %s",
691 sender.Name, pktName,
692 humanize.IBytes(pktSize),
693 ctx.AreaName(areaId),
696 area := ctx.AreaId2Area[*areaId]
698 err = errors.New("unknown area")
699 ctx.LogE("rx-area-unknown", les, err, logMsg)
702 pktEnc, pktEncRaw, err := ctx.HdrRead(pipeR)
703 fullPipeR := io.MultiReader(bytes.NewReader(pktEncRaw), pipeR)
705 ctx.LogE("rx-area-pkt-enc-read", les, err, logMsg)
708 msgHashRaw := blake2b.Sum256(pktEncRaw)
709 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
710 les = append(les, LE{"AreaMsg", msgHash})
711 ctx.LogD("rx-area", les, logMsg)
714 for _, nodeId := range area.Subs {
715 node := ctx.Neigh[*nodeId]
716 lesEcho := append(les, LE{"Echo", nodeId})
717 seenDir := filepath.Join(
718 ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
720 seenPath := filepath.Join(seenDir, msgHash)
721 logMsgNode := func(les LEs) string {
723 "%s: echoing to: %s", logMsg(les), node.Name,
726 if _, err := os.Stat(seenPath); err == nil {
727 ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
728 return logMsgNode(les) + ": already sent"
732 ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
735 for _, nodeId := range area.Subs {
736 node := ctx.Neigh[*nodeId]
737 lesEcho := append(les, LE{"Echo", nodeId})
738 seenDir := filepath.Join(
739 ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
741 seenPath := filepath.Join(seenDir, msgHash)
742 logMsgNode := func(les LEs) string {
743 return fmt.Sprintf("%s: echo to: %s", logMsg(les), node.Name)
745 if _, err := os.Stat(seenPath); err == nil {
746 ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
747 return logMsgNode(les) + ": already sent"
751 if nodeId != sender.Id && nodeId != pktEnc.Sender {
752 ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
753 if _, _, err = ctx.Tx(
757 int64(pktSize), 0, MaxFileSize,
762 ctx.LogE("rx-area", lesEcho, err, logMsgNode)
766 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
767 ctx.LogE("rx-area-mkdir", lesEcho, err, logMsgNode)
770 if fd, err := os.Create(seenPath); err == nil {
772 if err = DirSync(seenDir); err != nil {
773 ctx.LogE("rx-area-dirsync", les, err, logMsgNode)
777 ctx.LogE("rx-area-touch", lesEcho, err, logMsgNode)
780 return JobRepeatProcess
784 seenDir := filepath.Join(
785 ctx.Spool, ctx.SelfId.String(), AreaDir, area.Id.String(),
787 seenPath := filepath.Join(seenDir, msgHash)
788 if _, err := os.Stat(seenPath); err == nil {
789 ctx.LogD("rx-area-seen", les, func(les LEs) string {
790 return logMsg(les) + ": already seen"
792 if !dryRun && jobPath != "" {
793 if err = os.Remove(jobPath); err != nil {
794 ctx.LogE("rx-area-remove", les, err, func(les LEs) string {
796 "Tossing area %s/%s (%s): %s: removing",
797 sender.Name, pktName,
798 humanize.IBytes(pktSize),
803 } else if ctx.HdrUsage {
804 os.Remove(JobPath2Hdr(jobPath))
811 ctx.LogD("rx-area-no-prv", les, func(les LEs) string {
812 return logMsg(les) + ": no private key for decoding"
815 signatureVerify := true
816 if _, senderKnown := ctx.Neigh[*pktEnc.Sender]; !senderKnown {
817 if !area.AllowUnknown {
818 err = errors.New("unknown sender")
821 append(les, LE{"Sender", pktEnc.Sender}),
823 func(les LEs) string {
824 return logMsg(les) + ": sender: " + pktEnc.Sender.String()
829 signatureVerify = false
831 areaNodeOur := NodeOur{Id: new(NodeId), ExchPrv: new([32]byte)}
832 copy(areaNodeOur.Id[:], area.Id[:])
833 copy(areaNodeOur.ExchPrv[:], area.Prv[:])
837 Incoming: area.Incoming,
840 copy(areaNode.Id[:], area.Id[:])
841 pktName := fmt.Sprintf(
843 Base32Codec.EncodeToString(areaId[:]), msgHash,
846 pipeR, pipeW := io.Pipe()
847 errs := make(chan error, 1)
856 uint64(pktSizeWithoutEnc(int64(pktSize))),
859 dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
862 _, _, _, err = PktEncRead(
871 ctx.LogE("rx-area-pkt-enc-read2", les, err, logMsg)
872 pipeW.CloseWithError(err)
877 if err = <-errs; err != nil {
882 if !dryRun && jobPath != "" {
883 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
884 ctx.LogE("rx-area-mkdir", les, err, logMsg)
887 if fd, err := os.Create(seenPath); err == nil {
889 if err = DirSync(seenDir); err != nil {
890 ctx.LogE("rx-area-dirsync", les, err, logMsg)
894 if err = os.Remove(jobPath); err != nil {
895 ctx.LogE("rx", les, err, func(les LEs) string {
897 "Tossing area %s/%s (%s): %s: removing",
898 sender.Name, pktName,
899 humanize.IBytes(pktSize),
904 } else if ctx.HdrUsage {
905 os.Remove(JobPath2Hdr(jobPath))
910 err = errors.New("unknown type")
912 "rx-type-unknown", les, err,
913 func(les LEs) string {
915 "Tossing %s/%s (%s)",
916 sender.Name, pktName, humanize.IBytes(pktSize),
925 func (ctx *Ctx) Toss(
929 dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
931 dirLock, err := ctx.LockDir(nodeId, "toss")
935 defer ctx.UnlockDir(dirLock)
937 decompressor, err := zstd.NewReader(nil)
941 defer decompressor.Close()
942 for job := range ctx.Jobs(nodeId, xx) {
943 pktName := filepath.Base(job.Path)
945 {"Node", job.PktEnc.Sender},
947 {"Nice", int(job.PktEnc.Nice)},
949 if job.PktEnc.Nice > nice {
950 ctx.LogD("rx-too-nice", les, func(les LEs) string {
952 "Tossing %s/%s: too nice: %s",
953 ctx.NodeName(job.PktEnc.Sender), pktName,
954 NicenessFmt(job.PktEnc.Nice),
959 fd, err := os.Open(job.Path)
961 ctx.LogE("rx-open", les, err, func(les LEs) string {
963 "Tossing %s/%s: opening %s",
964 ctx.NodeName(job.PktEnc.Sender), pktName, job.Path,
970 sender := ctx.Neigh[*job.PktEnc.Sender]
972 err := errors.New("unknown node")
973 ctx.LogE("rx-open", les, err, func(les LEs) string {
976 ctx.NodeName(job.PktEnc.Sender), pktName,
982 errs := make(chan error, 1)
985 pipeR, pipeW := io.Pipe()
994 uint64(pktSizeWithoutEnc(job.Size)),
997 dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
1000 pipeWB := bufio.NewWriter(pipeW)
1001 sharedKey, _, _, err = PktEncRead(
1004 bufio.NewReader(fd),
1010 pipeW.CloseWithError(err)
1012 if err := pipeWB.Flush(); err != nil {
1013 pipeW.CloseWithError(err)
1023 if err = <-errs; err == JobRepeatProcess {
1024 if _, err = fd.Seek(0, io.SeekStart); err != nil {
1025 ctx.LogE("rx-seek", les, err, func(les LEs) string {
1027 "Tossing %s/%s: can not seek",
1028 ctx.NodeName(job.PktEnc.Sender),
1036 } else if err != nil {
1044 func (ctx *Ctx) AutoToss(
1047 doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
1048 ) (chan struct{}, chan bool) {
1049 dw, err := ctx.NewDirWatcher(
1050 filepath.Join(ctx.Spool, nodeId.String(), string(TRx)),
1056 finish := make(chan struct{})
1057 badCode := make(chan bool)
1068 nodeId, TRx, nice, false,
1069 doSeen, noFile, noFreq, noExec, noTrns, noArea) || bad
1073 return finish, badCode