2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
38 xdr "github.com/davecgh/go-xdr/xdr2"
39 "github.com/dustin/go-humanize"
40 "github.com/klauspost/compress/zstd"
41 "golang.org/x/crypto/blake2b"
42 "golang.org/x/crypto/poly1305"
49 func jobPath2Seen(jobPath string) string {
50 return filepath.Join(filepath.Dir(jobPath), SeenDir, filepath.Base(jobPath))
53 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
55 "From: " + fromTo.From,
57 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
63 "Content-Type: text/plain; charset=utf-8",
64 "Content-Transfer-Encoding: base64",
66 base64.StdEncoding.EncodeToString(body),
69 return strings.NewReader(strings.Join(lines, "\n"))
72 func pktSizeWithoutEnc(pktSize int64) int64 {
73 pktSize = pktSize - PktEncOverhead - PktOverhead - PktSizeOverhead
74 pktSizeBlocks := pktSize / (EncBlkSize + poly1305.TagSize)
75 if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
76 pktSize -= poly1305.TagSize
78 pktSize -= pktSizeBlocks * poly1305.TagSize
82 var JobRepeatProcess = errors.New("needs processing repeat")
93 decompressor *zstd.Decoder,
94 dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
97 sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
99 _, err := xdr.Unmarshal(pipeR, &pkt)
101 ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
102 return fmt.Sprintf("Tossing %s/%s: unmarshal", sender.Name, pktName)
106 les = append(les, LE{"Size", int64(pktSize)})
107 ctx.LogD("rx", les, func(les LEs) string {
109 "Tossing %s/%s (%s)",
110 sender.Name, pktName,
111 humanize.IBytes(pktSize),
115 case PktTypeExec, PktTypeExecFat:
119 path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
120 handle := string(path[0])
121 args := make([]string, 0, len(path)-1)
122 for _, p := range path[1:] {
123 args = append(args, string(p))
125 argsStr := strings.Join(append([]string{handle}, args...), " ")
126 les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr})
127 cmdline := sender.Exec[handle]
128 if len(cmdline) == 0 {
129 err = errors.New("No handle found")
131 "rx-no-handle", les, err,
132 func(les LEs) string {
134 "Tossing exec %s/%s (%s): %s",
135 sender.Name, pktName,
136 humanize.IBytes(pktSize), argsStr,
142 if pkt.Type == PktTypeExec {
143 if err = decompressor.Reset(pipeR); err != nil {
148 cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...)
151 "NNCP_SELF="+ctx.Self.Id.String(),
152 "NNCP_SENDER="+sender.Id.String(),
153 "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
155 if pkt.Type == PktTypeExec {
156 cmd.Stdin = decompressor
160 output, err := cmd.Output()
162 ctx.LogE("rx-hande", les, err, func(les LEs) string {
164 "Tossing exec %s/%s (%s): %s: handling",
165 sender.Name, pktName,
166 humanize.IBytes(uint64(pktSize)), argsStr,
171 if len(sendmail) > 0 && ctx.NotifyExec != nil {
172 notify := ctx.NotifyExec[sender.Name+"."+handle]
174 notify = ctx.NotifyExec["*."+handle]
179 append(sendmail[1:], notify.To)...,
181 cmd.Stdin = newNotification(notify, fmt.Sprintf(
182 "Exec from %s: %s", sender.Name, argsStr,
184 if err = cmd.Run(); err != nil {
185 ctx.LogE("rx-notify", les, err, func(les LEs) string {
187 "Tossing exec %s/%s (%s): %s: notifying",
188 sender.Name, pktName,
189 humanize.IBytes(pktSize), argsStr,
196 ctx.LogI("rx", les, func(les LEs) string {
198 "Got exec from %s to %s (%s)",
199 sender.Name, argsStr,
200 humanize.IBytes(pktSize),
203 if !dryRun && jobPath != "" {
205 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
208 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
210 if err = DirSync(filepath.Dir(jobPath)); err != nil {
211 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
213 "Tossing file %s/%s (%s): %s: dirsyncing",
214 sender.Name, pktName,
215 humanize.IBytes(pktSize),
216 filepath.Base(jobPath),
223 if err = os.Remove(jobPath); err != nil {
224 ctx.LogE("rx-notify", les, err, func(les LEs) string {
226 "Tossing exec %s/%s (%s): %s: notifying",
227 sender.Name, pktName,
228 humanize.IBytes(pktSize), argsStr,
232 } else if ctx.HdrUsage {
233 os.Remove(JobPath2Hdr(jobPath))
241 dst := string(pkt.Path[:int(pkt.PathLen)])
242 les = append(les, LE{"Type", "file"}, LE{"Dst", dst})
243 if filepath.IsAbs(dst) {
244 err = errors.New("non-relative destination path")
246 "rx-non-rel", les, err,
247 func(les LEs) string {
249 "Tossing file %s/%s (%s): %s",
250 sender.Name, pktName,
251 humanize.IBytes(pktSize), dst,
257 incoming := sender.Incoming
259 err = errors.New("incoming is not allowed")
261 "rx-no-incoming", les, err,
262 func(les LEs) string {
264 "Tossing file %s/%s (%s): %s",
265 sender.Name, pktName,
266 humanize.IBytes(pktSize), dst,
272 dir := filepath.Join(*incoming, path.Dir(dst))
273 if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
274 ctx.LogE("rx-mkdir", les, err, func(les LEs) string {
276 "Tossing file %s/%s (%s): %s: mkdir",
277 sender.Name, pktName,
278 humanize.IBytes(pktSize), dst,
284 tmp, err := TempFile(dir, "file")
286 ctx.LogE("rx-mktemp", les, err, func(les LEs) string {
288 "Tossing file %s/%s (%s): %s: mktemp",
289 sender.Name, pktName,
290 humanize.IBytes(pktSize), dst,
295 les = append(les, LE{"Tmp", tmp.Name()})
296 ctx.LogD("rx-tmp-created", les, func(les LEs) string {
298 "Tossing file %s/%s (%s): %s: created: %s",
299 sender.Name, pktName,
300 humanize.IBytes(pktSize), dst, tmp.Name(),
303 bufW := bufio.NewWriter(tmp)
304 if _, err = CopyProgressed(
305 bufW, pipeR, "Rx file",
306 append(les, LE{"FullSize", int64(pktSize)}),
309 ctx.LogE("rx-copy", les, err, func(les LEs) string {
311 "Tossing file %s/%s (%s): %s: copying",
312 sender.Name, pktName,
313 humanize.IBytes(pktSize), dst,
318 if err = bufW.Flush(); err != nil {
320 ctx.LogE("rx-flush", les, err, func(les LEs) string {
322 "Tossing file %s/%s (%s): %s: flushing",
323 sender.Name, pktName,
324 humanize.IBytes(pktSize), dst,
329 if err = tmp.Sync(); err != nil {
331 ctx.LogE("rx-sync", les, err, func(les LEs) string {
333 "Tossing file %s/%s (%s): %s: syncing",
334 sender.Name, pktName,
335 humanize.IBytes(pktSize), dst,
340 if err = tmp.Close(); err != nil {
341 ctx.LogE("rx-close", les, err, func(les LEs) string {
343 "Tossing file %s/%s (%s): %s: closing",
344 sender.Name, pktName,
345 humanize.IBytes(pktSize), dst,
350 dstPathOrig := filepath.Join(*incoming, dst)
351 dstPath := dstPathOrig
354 if _, err = os.Stat(dstPath); err != nil {
355 if os.IsNotExist(err) {
358 ctx.LogE("rx-stat", les, err, func(les LEs) string {
360 "Tossing file %s/%s (%s): %s: stating: %s",
361 sender.Name, pktName,
362 humanize.IBytes(pktSize), dst, dstPath,
367 dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
370 if err = os.Rename(tmp.Name(), dstPath); err != nil {
371 ctx.LogE("rx-rename", les, err, func(les LEs) string {
373 "Tossing file %s/%s (%s): %s: renaming",
374 sender.Name, pktName,
375 humanize.IBytes(pktSize), dst,
380 if err = DirSync(*incoming); err != nil {
381 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
383 "Tossing file %s/%s (%s): %s: dirsyncing",
384 sender.Name, pktName,
385 humanize.IBytes(pktSize), dst,
390 les = les[:len(les)-1] // delete Tmp
392 ctx.LogI("rx", les, func(les LEs) string {
394 "Got file %s (%s) from %s",
395 dst, humanize.IBytes(pktSize), sender.Name,
401 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
404 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
406 if err = DirSync(filepath.Dir(jobPath)); err != nil {
407 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
409 "Tossing file %s/%s (%s): %s: dirsyncing",
410 sender.Name, pktName,
411 humanize.IBytes(pktSize),
412 filepath.Base(jobPath),
419 if err = os.Remove(jobPath); err != nil {
420 ctx.LogE("rx-remove", les, err, func(les LEs) string {
422 "Tossing file %s/%s (%s): %s: removing",
423 sender.Name, pktName,
424 humanize.IBytes(pktSize), dst,
428 } else if ctx.HdrUsage {
429 os.Remove(JobPath2Hdr(jobPath))
432 if len(sendmail) > 0 && ctx.NotifyFile != nil {
435 append(sendmail[1:], ctx.NotifyFile.To)...,
437 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
438 "File from %s: %s (%s)",
439 sender.Name, dst, humanize.IBytes(pktSize),
441 if err = cmd.Run(); err != nil {
442 ctx.LogE("rx-notify", les, err, func(les LEs) string {
444 "Tossing file %s/%s (%s): %s: notifying",
445 sender.Name, pktName,
446 humanize.IBytes(pktSize), dst,
457 src := string(pkt.Path[:int(pkt.PathLen)])
458 les := append(les, LE{"Type", "freq"}, LE{"Src", src})
459 if filepath.IsAbs(src) {
460 err = errors.New("non-relative source path")
462 "rx-non-rel", les, err,
463 func(les LEs) string {
465 "Tossing freq %s/%s (%s): %s: notifying",
466 sender.Name, pktName,
467 humanize.IBytes(pktSize), src,
473 dstRaw, err := ioutil.ReadAll(pipeR)
475 ctx.LogE("rx-read", les, err, func(les LEs) string {
477 "Tossing freq %s/%s (%s): %s: reading",
478 sender.Name, pktName,
479 humanize.IBytes(pktSize), src,
484 dst := string(dstRaw)
485 les = append(les, LE{"Dst", dst})
486 freqPath := sender.FreqPath
488 err = errors.New("freqing is not allowed")
490 "rx-no-freq", les, err,
491 func(les LEs) string {
493 "Tossing freq %s/%s (%s): %s -> %s",
494 sender.Name, pktName,
495 humanize.IBytes(pktSize), src, dst,
505 filepath.Join(*freqPath, src),
513 ctx.LogE("rx-tx", les, err, func(les LEs) string {
515 "Tossing freq %s/%s (%s): %s -> %s: txing",
516 sender.Name, pktName,
517 humanize.IBytes(pktSize), src, dst,
523 ctx.LogI("rx", les, func(les LEs) string {
524 return fmt.Sprintf("Got file request %s to %s", src, sender.Name)
529 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
532 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
534 if err = DirSync(filepath.Dir(jobPath)); err != nil {
535 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
537 "Tossing file %s/%s (%s): %s: dirsyncing",
538 sender.Name, pktName,
539 humanize.IBytes(pktSize),
540 filepath.Base(jobPath),
547 if err = os.Remove(jobPath); err != nil {
548 ctx.LogE("rx-remove", les, err, func(les LEs) string {
550 "Tossing freq %s/%s (%s): %s -> %s: removing",
551 sender.Name, pktName,
552 humanize.IBytes(pktSize), src, dst,
556 } else if ctx.HdrUsage {
557 os.Remove(JobPath2Hdr(jobPath))
560 if len(sendmail) > 0 && ctx.NotifyFreq != nil {
563 append(sendmail[1:], ctx.NotifyFreq.To)...,
565 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
566 "Freq from %s: %s", sender.Name, src,
568 if err = cmd.Run(); err != nil {
569 ctx.LogE("rx-notify", les, err, func(les LEs) string {
571 "Tossing freq %s/%s (%s): %s -> %s: notifying",
572 sender.Name, pktName,
573 humanize.IBytes(pktSize), src, dst,
584 dst := new([MTHSize]byte)
585 copy(dst[:], pkt.Path[:int(pkt.PathLen)])
586 nodeId := NodeId(*dst)
587 les := append(les, LE{"Type", "trns"}, LE{"Dst", nodeId})
588 logMsg := func(les LEs) string {
590 "Tossing trns %s/%s (%s): %s",
591 sender.Name, pktName,
592 humanize.IBytes(pktSize),
596 node := ctx.Neigh[nodeId]
598 err = errors.New("unknown node")
599 ctx.LogE("rx-unknown", les, err, logMsg)
602 ctx.LogD("rx-tx", les, logMsg)
604 if len(node.Via) == 0 {
605 if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil {
606 ctx.LogE("rx", les, err, func(les LEs) string {
607 return logMsg(les) + ": txing"
612 via := node.Via[:len(node.Via)-1]
613 node = ctx.Neigh[*node.Via[len(node.Via)-1]]
614 node = &Node{Id: node.Id, Via: via, ExchPub: node.ExchPub}
615 pktTrns, err := NewPkt(PktTypeTrns, 0, nodeId[:])
628 ctx.LogE("rx", les, err, func(les LEs) string {
629 return logMsg(les) + ": txing"
635 ctx.LogI("rx", les, func(les LEs) string {
637 "Got transitional packet from %s to %s (%s)",
639 ctx.NodeName(&nodeId),
640 humanize.IBytes(pktSize),
643 if !dryRun && jobPath != "" {
645 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
648 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
650 if err = DirSync(filepath.Dir(jobPath)); err != nil {
651 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
653 "Tossing file %s/%s (%s): %s: dirsyncing",
654 sender.Name, pktName,
655 humanize.IBytes(pktSize),
656 filepath.Base(jobPath),
663 if err = os.Remove(jobPath); err != nil {
664 ctx.LogE("rx", les, err, func(les LEs) string {
666 "Tossing trns %s/%s (%s): %s: removing",
667 sender.Name, pktName,
668 humanize.IBytes(pktSize),
669 ctx.NodeName(&nodeId),
673 } else if ctx.HdrUsage {
674 os.Remove(JobPath2Hdr(jobPath))
682 areaId := new(AreaId)
683 copy(areaId[:], pkt.Path[:int(pkt.PathLen)])
684 les := append(les, LE{"Type", "area"}, LE{"Area", areaId})
685 logMsg := func(les LEs) string {
687 "Tossing %s/%s (%s): area %s",
688 sender.Name, pktName,
689 humanize.IBytes(pktSize),
690 ctx.AreaName(areaId),
693 area := ctx.AreaId2Area[*areaId]
695 err = errors.New("unknown area")
696 ctx.LogE("rx-area-unknown", les, err, logMsg)
699 pktEnc, pktEncRaw, err := ctx.HdrRead(pipeR)
700 fullPipeR := io.MultiReader(bytes.NewReader(pktEncRaw), pipeR)
702 ctx.LogE("rx-area-pkt-enc-read", les, err, logMsg)
705 msgHashRaw := blake2b.Sum256(pktEncRaw)
706 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
707 les = append(les, LE{"AreaMsg", msgHash})
708 ctx.LogD("rx-area", les, logMsg)
711 for _, nodeId := range area.Subs {
712 node := ctx.Neigh[*nodeId]
713 lesEcho := append(les, LE{"Echo", nodeId})
714 seenDir := filepath.Join(
715 ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
717 seenPath := filepath.Join(seenDir, msgHash)
718 logMsgNode := func(les LEs) string {
720 "%s: echoing to: %s", logMsg(les), node.Name,
723 if _, err := os.Stat(seenPath); err == nil {
724 ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
725 return logMsgNode(les) + ": already sent"
729 ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
732 for _, nodeId := range area.Subs {
733 node := ctx.Neigh[*nodeId]
734 lesEcho := append(les, LE{"Echo", nodeId})
735 seenDir := filepath.Join(
736 ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
738 seenPath := filepath.Join(seenDir, msgHash)
739 logMsgNode := func(les LEs) string {
740 return fmt.Sprintf("%s: echo to: %s", logMsg(les), node.Name)
742 if _, err := os.Stat(seenPath); err == nil {
743 ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
744 return logMsgNode(les) + ": already sent"
748 if nodeId != sender.Id && nodeId != pktEnc.Sender {
749 ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
751 node, &pkt, nice, int64(pktSize), 0, fullPipeR, pktName, nil,
753 ctx.LogE("rx-area", lesEcho, err, logMsgNode)
757 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
758 ctx.LogE("rx-area-mkdir", lesEcho, err, logMsgNode)
761 if fd, err := os.Create(seenPath); err == nil {
763 if err = DirSync(seenDir); err != nil {
764 ctx.LogE("rx-area-dirsync", les, err, logMsgNode)
768 ctx.LogE("rx-area-touch", lesEcho, err, logMsgNode)
771 return JobRepeatProcess
775 seenDir := filepath.Join(
776 ctx.Spool, ctx.SelfId.String(), AreaDir, area.Id.String(),
778 seenPath := filepath.Join(seenDir, msgHash)
779 if _, err := os.Stat(seenPath); err == nil {
780 ctx.LogD("rx-area-seen", les, func(les LEs) string {
781 return logMsg(les) + ": already seen"
783 if !dryRun && jobPath != "" {
784 if err = os.Remove(jobPath); err != nil {
785 ctx.LogE("rx-area-remove", les, err, func(les LEs) string {
787 "Tossing area %s/%s (%s): %s: removing",
788 sender.Name, pktName,
789 humanize.IBytes(pktSize),
794 } else if ctx.HdrUsage {
795 os.Remove(JobPath2Hdr(jobPath))
802 ctx.LogD("rx-area-no-prv", les, func(les LEs) string {
803 return logMsg(les) + ": no private key for decoding"
806 signatureVerify := true
807 if _, senderKnown := ctx.Neigh[*pktEnc.Sender]; !senderKnown {
808 if !area.AllowUnknown {
809 err = errors.New("unknown sender")
812 append(les, LE{"Sender", pktEnc.Sender}),
814 func(les LEs) string {
815 return logMsg(les) + ": sender: " + pktEnc.Sender.String()
820 signatureVerify = false
822 areaNodeOur := NodeOur{Id: new(NodeId), ExchPrv: new([32]byte)}
823 copy(areaNodeOur.Id[:], area.Id[:])
824 copy(areaNodeOur.ExchPrv[:], area.Prv[:])
828 Incoming: area.Incoming,
831 copy(areaNode.Id[:], area.Id[:])
832 pktName := fmt.Sprintf(
834 Base32Codec.EncodeToString(areaId[:]), msgHash,
837 pipeR, pipeW := io.Pipe()
838 errs := make(chan error, 1)
847 uint64(pktSizeWithoutEnc(int64(pktSize))),
850 dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
853 _, _, _, err = PktEncRead(
862 pipeW.CloseWithError(err)
867 if err = <-errs; err != nil {
872 if !dryRun && jobPath != "" {
873 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
874 ctx.LogE("rx-area-mkdir", les, err, logMsg)
877 if fd, err := os.Create(seenPath); err == nil {
879 if err = DirSync(seenDir); err != nil {
880 ctx.LogE("rx-area-dirsync", les, err, logMsg)
884 if err = os.Remove(jobPath); err != nil {
885 ctx.LogE("rx", les, err, func(les LEs) string {
887 "Tossing area %s/%s (%s): %s: removing",
888 sender.Name, pktName,
889 humanize.IBytes(pktSize),
894 } else if ctx.HdrUsage {
895 os.Remove(JobPath2Hdr(jobPath))
900 err = errors.New("unknown type")
902 "rx-type-unknown", les, err,
903 func(les LEs) string {
905 "Tossing %s/%s (%s)",
906 sender.Name, pktName, humanize.IBytes(pktSize),
915 func (ctx *Ctx) Toss(
919 dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
921 dirLock, err := ctx.LockDir(nodeId, "toss")
925 defer ctx.UnlockDir(dirLock)
927 decompressor, err := zstd.NewReader(nil)
931 defer decompressor.Close()
932 for job := range ctx.Jobs(nodeId, xx) {
933 pktName := filepath.Base(job.Path)
935 {"Node", job.PktEnc.Sender},
937 {"Nice", int(job.PktEnc.Nice)},
939 if job.PktEnc.Nice > nice {
940 ctx.LogD("rx-too-nice", les, func(les LEs) string {
942 "Tossing %s/%s: too nice: %s",
943 ctx.NodeName(job.PktEnc.Sender), pktName,
944 NicenessFmt(job.PktEnc.Nice),
949 fd, err := os.Open(job.Path)
951 ctx.LogE("rx-open", les, err, func(les LEs) string {
953 "Tossing %s/%s: opening %s",
954 ctx.NodeName(job.PktEnc.Sender), pktName, job.Path,
960 sender := ctx.Neigh[*job.PktEnc.Sender]
962 err := errors.New("unknown node")
963 ctx.LogE("rx-open", les, err, func(les LEs) string {
966 ctx.NodeName(job.PktEnc.Sender), pktName,
972 errs := make(chan error, 1)
975 pipeR, pipeW := io.Pipe()
984 uint64(pktSizeWithoutEnc(job.Size)),
987 dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea,
990 pipeWB := bufio.NewWriter(pipeW)
991 sharedKey, _, _, err = PktEncRead(
1000 pipeW.CloseWithError(err)
1002 if err := pipeWB.Flush(); err != nil {
1003 pipeW.CloseWithError(err)
1013 if err = <-errs; err == JobRepeatProcess {
1014 if _, err = fd.Seek(0, io.SeekStart); err != nil {
1015 ctx.LogE("rx-seek", les, err, func(les LEs) string {
1017 "Tossing %s/%s: can not seek",
1018 ctx.NodeName(job.PktEnc.Sender),
1026 } else if err != nil {
1034 func (ctx *Ctx) AutoToss(
1037 doSeen, noFile, noFreq, noExec, noTrns, noArea bool,
1038 ) (chan struct{}, chan bool) {
1039 dw, err := ctx.NewDirWatcher(
1040 filepath.Join(ctx.Spool, nodeId.String(), string(TRx)),
1046 finish := make(chan struct{})
1047 badCode := make(chan bool)
1058 nodeId, TRx, nice, false,
1059 doSeen, noFile, noFreq, noExec, noTrns, noArea) || bad
1063 return finish, badCode