2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2023 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
37 xdr "github.com/davecgh/go-xdr/xdr2"
38 "github.com/dustin/go-humanize"
39 "github.com/klauspost/compress/zstd"
40 "golang.org/x/crypto/blake2b"
41 "golang.org/x/crypto/poly1305"
48 func jobPath2Seen(jobPath string) string {
49 return filepath.Join(filepath.Dir(jobPath), SeenDir, filepath.Base(jobPath))
52 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
54 "From: " + fromTo.From,
56 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
62 "Content-Type: text/plain; charset=utf-8",
63 "Content-Transfer-Encoding: base64",
65 base64.StdEncoding.EncodeToString(body),
68 return strings.NewReader(strings.Join(lines, "\n"))
71 func pktSizeWithoutEnc(pktSize int64) int64 {
72 pktSize = pktSize - PktEncOverhead - PktOverhead - PktSizeOverhead
73 pktSizeBlocks := pktSize / (EncBlkSize + poly1305.TagSize)
74 if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
75 pktSize -= poly1305.TagSize
77 pktSize -= pktSizeBlocks * poly1305.TagSize
81 var JobRepeatProcess = errors.New("needs processing repeat")
92 decompressor *zstd.Decoder,
93 dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool,
96 sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
98 _, err := xdr.Unmarshal(pipeR, &pkt)
100 ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
101 return fmt.Sprintf("Tossing %s/%s: unmarshal", sender.Name, pktName)
105 les = append(les, LE{"Size", int64(pktSize)})
106 ctx.LogD("rx", les, func(les LEs) string {
108 "Tossing %s/%s (%s)",
109 sender.Name, pktName,
110 humanize.IBytes(pktSize),
114 case PktTypeExec, PktTypeExecFat:
118 path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
119 handle := string(path[0])
120 args := make([]string, 0, len(path)-1)
121 for _, p := range path[1:] {
122 args = append(args, string(p))
124 argsStr := strings.Join(append([]string{handle}, args...), " ")
125 les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr})
126 cmdline := sender.Exec[handle]
127 if len(cmdline) == 0 {
128 err = errors.New("No handle found")
130 "rx-no-handle", les, err,
131 func(les LEs) string {
133 "Tossing exec %s/%s (%s): %s",
134 sender.Name, pktName,
135 humanize.IBytes(pktSize), argsStr,
141 if pkt.Type == PktTypeExec {
142 if err = decompressor.Reset(pipeR); err != nil {
147 cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...)
150 "NNCP_SELF="+ctx.Self.Id.String(),
151 "NNCP_SENDER="+sender.Id.String(),
152 "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
154 if pkt.Type == PktTypeExec {
155 cmd.Stdin = decompressor
159 output, err := cmd.CombinedOutput()
161 les = append(les, LE{"Output", strings.Split(
162 strings.Trim(string(output), "\n"), "\n"),
164 ctx.LogE("rx-handle", les, err, func(les LEs) string {
166 "Tossing exec %s/%s (%s): %s: handling",
167 sender.Name, pktName,
168 humanize.IBytes(uint64(pktSize)), argsStr,
173 if len(sendmail) > 0 && ctx.NotifyExec != nil {
174 notify := ctx.NotifyExec[sender.Name+"."+handle]
176 notify = ctx.NotifyExec["*."+handle]
181 append(sendmail[1:], notify.To)...,
183 cmd.Stdin = newNotification(notify, fmt.Sprintf(
184 "Exec from %s: %s", sender.Name, argsStr,
186 if err = cmd.Run(); err != nil {
187 ctx.LogE("rx-notify", les, err, func(les LEs) string {
189 "Tossing exec %s/%s (%s): %s: notifying",
190 sender.Name, pktName,
191 humanize.IBytes(pktSize), argsStr,
198 ctx.LogI("rx", les, func(les LEs) string {
200 "Got exec from %s to %s (%s)",
201 sender.Name, argsStr,
202 humanize.IBytes(pktSize),
205 if !dryRun && jobPath != "" {
207 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
210 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
212 if err = DirSync(filepath.Dir(jobPath)); err != nil {
213 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
215 "Tossing file %s/%s (%s): %s: dirsyncing",
216 sender.Name, pktName,
217 humanize.IBytes(pktSize),
218 filepath.Base(jobPath),
225 if err = os.Remove(jobPath); err != nil {
226 ctx.LogE("rx-notify", les, err, func(les LEs) string {
228 "Tossing exec %s/%s (%s): %s: notifying",
229 sender.Name, pktName,
230 humanize.IBytes(pktSize), argsStr,
234 } else if ctx.HdrUsage {
235 os.Remove(JobPath2Hdr(jobPath))
243 dst := string(pkt.Path[:int(pkt.PathLen)])
244 les = append(les, LE{"Type", "file"}, LE{"Dst", dst})
245 if filepath.IsAbs(dst) {
246 err = errors.New("non-relative destination path")
248 "rx-non-rel", les, err,
249 func(les LEs) string {
251 "Tossing file %s/%s (%s): %s",
252 sender.Name, pktName,
253 humanize.IBytes(pktSize), dst,
259 incoming := sender.Incoming
261 err = errors.New("incoming is not allowed")
263 "rx-no-incoming", les, err,
264 func(les LEs) string {
266 "Tossing file %s/%s (%s): %s",
267 sender.Name, pktName,
268 humanize.IBytes(pktSize), dst,
274 dir := filepath.Join(*incoming, path.Dir(dst))
275 if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
276 ctx.LogE("rx-mkdir", les, err, func(les LEs) string {
278 "Tossing file %s/%s (%s): %s: mkdir",
279 sender.Name, pktName,
280 humanize.IBytes(pktSize), dst,
286 tmp, err := TempFile(dir, "file")
288 ctx.LogE("rx-mktemp", les, err, func(les LEs) string {
290 "Tossing file %s/%s (%s): %s: mktemp",
291 sender.Name, pktName,
292 humanize.IBytes(pktSize), dst,
297 les = append(les, LE{"Tmp", tmp.Name()})
298 ctx.LogD("rx-tmp-created", les, func(les LEs) string {
300 "Tossing file %s/%s (%s): %s: created: %s",
301 sender.Name, pktName,
302 humanize.IBytes(pktSize), dst, tmp.Name(),
305 bufW := bufio.NewWriter(tmp)
306 if _, err = CopyProgressed(
307 bufW, pipeR, "Rx file",
308 append(les, LE{"FullSize", int64(pktSize)}),
311 ctx.LogE("rx-copy", les, err, func(les LEs) string {
313 "Tossing file %s/%s (%s): %s: copying",
314 sender.Name, pktName,
315 humanize.IBytes(pktSize), dst,
320 if err = bufW.Flush(); err != nil {
322 ctx.LogE("rx-flush", les, err, func(les LEs) string {
324 "Tossing file %s/%s (%s): %s: flushing",
325 sender.Name, pktName,
326 humanize.IBytes(pktSize), dst,
332 if err = tmp.Sync(); err != nil {
334 ctx.LogE("rx-sync", les, err, func(les LEs) string {
336 "Tossing file %s/%s (%s): %s: syncing",
337 sender.Name, pktName,
338 humanize.IBytes(pktSize), dst,
344 if err = tmp.Close(); err != nil {
345 ctx.LogE("rx-close", les, err, func(les LEs) string {
347 "Tossing file %s/%s (%s): %s: closing",
348 sender.Name, pktName,
349 humanize.IBytes(pktSize), dst,
354 dstPathOrig := filepath.Join(*incoming, dst)
355 dstPath := dstPathOrig
358 if _, err = os.Stat(dstPath); err != nil {
359 if os.IsNotExist(err) {
362 ctx.LogE("rx-stat", les, err, func(les LEs) string {
364 "Tossing file %s/%s (%s): %s: stating: %s",
365 sender.Name, pktName,
366 humanize.IBytes(pktSize), dst, dstPath,
371 dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
374 if err = os.Rename(tmp.Name(), dstPath); err != nil {
375 ctx.LogE("rx-rename", les, err, func(les LEs) string {
377 "Tossing file %s/%s (%s): %s: renaming",
378 sender.Name, pktName,
379 humanize.IBytes(pktSize), dst,
384 if err = DirSync(*incoming); err != nil {
385 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
387 "Tossing file %s/%s (%s): %s: dirsyncing",
388 sender.Name, pktName,
389 humanize.IBytes(pktSize), dst,
394 les = les[:len(les)-1] // delete Tmp
396 ctx.LogI("rx", les, func(les LEs) string {
398 "Got file %s (%s) from %s",
399 dst, humanize.IBytes(pktSize), sender.Name,
405 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
408 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
410 if err = DirSync(filepath.Dir(jobPath)); err != nil {
411 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
413 "Tossing file %s/%s (%s): %s: dirsyncing",
414 sender.Name, pktName,
415 humanize.IBytes(pktSize),
416 filepath.Base(jobPath),
423 if err = os.Remove(jobPath); err != nil {
424 ctx.LogE("rx-remove", les, err, func(les LEs) string {
426 "Tossing file %s/%s (%s): %s: removing",
427 sender.Name, pktName,
428 humanize.IBytes(pktSize), dst,
432 } else if ctx.HdrUsage {
433 os.Remove(JobPath2Hdr(jobPath))
436 if len(sendmail) > 0 && ctx.NotifyFile != nil {
439 append(sendmail[1:], ctx.NotifyFile.To)...,
441 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
442 "File from %s: %s (%s)",
443 sender.Name, dst, humanize.IBytes(pktSize),
445 if err = cmd.Run(); err != nil {
446 ctx.LogE("rx-notify", les, err, func(les LEs) string {
448 "Tossing file %s/%s (%s): %s: notifying",
449 sender.Name, pktName,
450 humanize.IBytes(pktSize), dst,
461 src := string(pkt.Path[:int(pkt.PathLen)])
462 les := append(les, LE{"Type", "freq"}, LE{"Src", src})
463 if filepath.IsAbs(src) {
464 err = errors.New("non-relative source path")
466 "rx-non-rel", les, err,
467 func(les LEs) string {
469 "Tossing freq %s/%s (%s): %s: notifying",
470 sender.Name, pktName,
471 humanize.IBytes(pktSize), src,
477 dstRaw, err := io.ReadAll(pipeR)
479 ctx.LogE("rx-read", les, err, func(les LEs) string {
481 "Tossing freq %s/%s (%s): %s: reading",
482 sender.Name, pktName,
483 humanize.IBytes(pktSize), src,
488 dst := string(dstRaw)
489 les = append(les, LE{"Dst", dst})
490 freqPath := sender.FreqPath
492 err = errors.New("freqing is not allowed")
494 "rx-no-freq", les, err,
495 func(les LEs) string {
497 "Tossing freq %s/%s (%s): %s -> %s",
498 sender.Name, pktName,
499 humanize.IBytes(pktSize), src, dst,
509 filepath.Join(*freqPath, src),
517 ctx.LogE("rx-tx", les, err, func(les LEs) string {
519 "Tossing freq %s/%s (%s): %s -> %s: txing",
520 sender.Name, pktName,
521 humanize.IBytes(pktSize), src, dst,
527 ctx.LogI("rx", les, func(les LEs) string {
528 return fmt.Sprintf("Got file request %s to %s", src, sender.Name)
533 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
536 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
538 if err = DirSync(filepath.Dir(jobPath)); err != nil {
539 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
541 "Tossing file %s/%s (%s): %s: dirsyncing",
542 sender.Name, pktName,
543 humanize.IBytes(pktSize),
544 filepath.Base(jobPath),
551 if err = os.Remove(jobPath); err != nil {
552 ctx.LogE("rx-remove", les, err, func(les LEs) string {
554 "Tossing freq %s/%s (%s): %s -> %s: removing",
555 sender.Name, pktName,
556 humanize.IBytes(pktSize), src, dst,
560 } else if ctx.HdrUsage {
561 os.Remove(JobPath2Hdr(jobPath))
564 if len(sendmail) > 0 && ctx.NotifyFreq != nil {
567 append(sendmail[1:], ctx.NotifyFreq.To)...,
569 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
570 "Freq from %s: %s", sender.Name, src,
572 if err = cmd.Run(); err != nil {
573 ctx.LogE("rx-notify", les, err, func(les LEs) string {
575 "Tossing freq %s/%s (%s): %s -> %s: notifying",
576 sender.Name, pktName,
577 humanize.IBytes(pktSize), src, dst,
588 dst := new([MTHSize]byte)
589 copy(dst[:], pkt.Path[:int(pkt.PathLen)])
590 nodeId := NodeId(*dst)
591 les := append(les, LE{"Type", "trns"}, LE{"Dst", nodeId})
592 logMsg := func(les LEs) string {
594 "Tossing trns %s/%s (%s): %s",
595 sender.Name, pktName,
596 humanize.IBytes(pktSize),
600 node := ctx.Neigh[nodeId]
602 err = errors.New("unknown node")
603 ctx.LogE("rx-unknown", les, err, logMsg)
606 ctx.LogD("rx-tx", les, logMsg)
608 if len(node.Via) == 0 {
609 if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil {
610 ctx.LogE("rx", les, err, func(les LEs) string {
611 return logMsg(les) + ": txing"
616 via := node.Via[:len(node.Via)-1]
617 node = ctx.Neigh[*node.Via[len(node.Via)-1]]
618 node = &Node{Id: node.Id, Via: via, ExchPub: node.ExchPub}
619 pktTrns, err := NewPkt(PktTypeTrns, 0, nodeId[:])
623 if _, _, _, err = ctx.Tx(
627 int64(pktSize), 0, MaxFileSize,
632 ctx.LogE("rx", les, err, func(les LEs) string {
633 return logMsg(les) + ": txing"
639 ctx.LogI("rx", les, func(les LEs) string {
641 "Got transitional packet from %s to %s (%s)",
643 ctx.NodeName(&nodeId),
644 humanize.IBytes(pktSize),
647 if !dryRun && jobPath != "" {
649 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
652 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
654 if err = DirSync(filepath.Dir(jobPath)); err != nil {
655 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
657 "Tossing file %s/%s (%s): %s: dirsyncing",
658 sender.Name, pktName,
659 humanize.IBytes(pktSize),
660 filepath.Base(jobPath),
667 if err = os.Remove(jobPath); err != nil {
668 ctx.LogE("rx", les, err, func(les LEs) string {
670 "Tossing trns %s/%s (%s): %s: removing",
671 sender.Name, pktName,
672 humanize.IBytes(pktSize),
673 ctx.NodeName(&nodeId),
677 } else if ctx.HdrUsage {
678 os.Remove(JobPath2Hdr(jobPath))
686 areaId := new(AreaId)
687 copy(areaId[:], pkt.Path[:int(pkt.PathLen)])
688 les := append(les, LE{"Type", "area"}, LE{"Area", areaId})
689 logMsg := func(les LEs) string {
691 "Tossing %s/%s (%s): area %s",
692 sender.Name, pktName,
693 humanize.IBytes(pktSize),
694 ctx.AreaName(areaId),
697 area := ctx.AreaId2Area[*areaId]
699 err = errors.New("unknown area")
700 ctx.LogE("rx-area-unknown", les, err, logMsg)
703 pktEnc, pktEncRaw, err := ctx.HdrRead(pipeR)
704 fullPipeR := io.MultiReader(bytes.NewReader(pktEncRaw), pipeR)
706 ctx.LogE("rx-area-pkt-enc-read", les, err, logMsg)
709 msgHashRaw := blake2b.Sum256(pktEncRaw)
710 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
711 les = append(les, LE{"AreaMsg", msgHash})
712 ctx.LogD("rx-area", les, logMsg)
715 for _, nodeId := range area.Subs {
716 node := ctx.Neigh[*nodeId]
717 lesEcho := append(les, LE{"Echo", nodeId})
718 seenDir := filepath.Join(
719 ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
721 seenPath := filepath.Join(seenDir, msgHash)
722 logMsgNode := func(les LEs) string {
724 "%s: echoing to: %s", logMsg(les), node.Name,
727 if _, err := os.Stat(seenPath); err == nil {
728 ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
729 return logMsgNode(les) + ": already sent"
733 ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
736 for _, nodeId := range area.Subs {
737 node := ctx.Neigh[*nodeId]
738 lesEcho := append(les, LE{"Echo", nodeId})
739 seenDir := filepath.Join(
740 ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
742 seenPath := filepath.Join(seenDir, msgHash)
743 logMsgNode := func(les LEs) string {
744 return fmt.Sprintf("%s: echo to: %s", logMsg(les), node.Name)
746 if _, err := os.Stat(seenPath); err == nil {
747 ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
748 return logMsgNode(les) + ": already sent"
752 if nodeId != sender.Id && nodeId != pktEnc.Sender {
753 ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
754 if _, _, _, err = ctx.Tx(
758 int64(pktSize), 0, MaxFileSize,
763 ctx.LogE("rx-area", lesEcho, err, logMsgNode)
767 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
768 ctx.LogE("rx-area-mkdir", lesEcho, err, logMsgNode)
771 if fd, err := os.Create(seenPath); err == nil {
773 if err = DirSync(seenDir); err != nil {
774 ctx.LogE("rx-area-dirsync", les, err, logMsgNode)
778 ctx.LogE("rx-area-touch", lesEcho, err, logMsgNode)
781 return JobRepeatProcess
785 seenDir := filepath.Join(
786 ctx.Spool, ctx.SelfId.String(), AreaDir, area.Id.String(),
788 seenPath := filepath.Join(seenDir, msgHash)
789 if _, err := os.Stat(seenPath); err == nil {
790 ctx.LogD("rx-area-seen", les, func(les LEs) string {
791 return logMsg(les) + ": already seen"
793 if !dryRun && jobPath != "" {
794 if err = os.Remove(jobPath); err != nil {
795 ctx.LogE("rx-area-remove", les, err, func(les LEs) string {
797 "Tossing area %s/%s (%s): %s: removing",
798 sender.Name, pktName,
799 humanize.IBytes(pktSize),
804 } else if ctx.HdrUsage {
805 os.Remove(JobPath2Hdr(jobPath))
812 ctx.LogD("rx-area-no-prv", les, func(les LEs) string {
813 return logMsg(les) + ": no private key for decoding"
816 signatureVerify := true
817 if _, senderKnown := ctx.Neigh[*pktEnc.Sender]; !senderKnown {
818 if !area.AllowUnknown {
819 err = errors.New("unknown sender")
822 append(les, LE{"Sender", pktEnc.Sender}),
824 func(les LEs) string {
825 return logMsg(les) + ": sender: " + pktEnc.Sender.String()
830 signatureVerify = false
832 areaNodeOur := NodeOur{Id: new(NodeId), ExchPrv: new([32]byte)}
833 copy(areaNodeOur.Id[:], area.Id[:])
834 copy(areaNodeOur.ExchPrv[:], area.Prv[:])
838 Incoming: area.Incoming,
841 copy(areaNode.Id[:], area.Id[:])
842 pktName := fmt.Sprintf(
844 Base32Codec.EncodeToString(areaId[:]), msgHash,
847 pipeR, pipeW := io.Pipe()
848 errs := make(chan error, 1)
857 uint64(pktSizeWithoutEnc(int64(pktSize))),
860 dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK,
863 _, _, _, err = PktEncRead(
872 ctx.LogE("rx-area-pkt-enc-read2", les, err, logMsg)
873 pipeW.CloseWithError(err)
878 if err = <-errs; err != nil {
883 if !dryRun && jobPath != "" {
884 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
885 ctx.LogE("rx-area-mkdir", les, err, logMsg)
888 if fd, err := os.Create(seenPath); err == nil {
890 if err = DirSync(seenDir); err != nil {
891 ctx.LogE("rx-area-dirsync", les, err, logMsg)
895 if err = os.Remove(jobPath); err != nil {
896 ctx.LogE("rx", les, err, func(les LEs) string {
898 "Tossing area %s/%s (%s): %s: removing",
899 sender.Name, pktName,
900 humanize.IBytes(pktSize),
905 } else if ctx.HdrUsage {
906 os.Remove(JobPath2Hdr(jobPath))
914 hsh := Base32Codec.EncodeToString(pkt.Path[:MTHSize])
915 les := append(les, LE{"Type", "ack"}, LE{"Pkt", hsh})
916 logMsg := func(les LEs) string {
917 return fmt.Sprintf("Tossing ack %s/%s: %s", sender.Name, pktName, hsh)
919 ctx.LogD("rx-ack", les, logMsg)
920 pktPath := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), hsh)
921 if _, err := os.Stat(pktPath); err == nil {
923 if err = os.Remove(pktPath); err != nil {
924 ctx.LogE("rx-ack", les, err, func(les LEs) string {
925 return logMsg(les) + ": removing packet"
928 } else if ctx.HdrUsage {
929 os.Remove(JobPath2Hdr(pktPath))
933 ctx.LogD("rx-ack", les, func(les LEs) string {
934 return logMsg(les) + ": already disappeared"
937 if !dryRun && doSeen {
938 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
941 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
943 if err = DirSync(filepath.Dir(jobPath)); err != nil {
944 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
946 "Tossing file %s/%s (%s): %s: dirsyncing",
947 sender.Name, pktName,
948 humanize.IBytes(pktSize),
949 filepath.Base(jobPath),
957 if err = os.Remove(jobPath); err != nil {
958 ctx.LogE("rx", les, err, func(les LEs) string {
959 return logMsg(les) + ": removing job"
962 } else if ctx.HdrUsage {
963 os.Remove(JobPath2Hdr(jobPath))
966 ctx.LogI("rx", les, func(les LEs) string {
967 return fmt.Sprintf("Got ACK packet from %s of %s", sender.Name, hsh)
971 err = errors.New("unknown type")
973 "rx-type-unknown", les, err,
974 func(les LEs) string {
976 "Tossing %s/%s (%s)",
977 sender.Name, pktName, humanize.IBytes(pktSize),
986 func (ctx *Ctx) Toss(
990 dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool,
992 dirLock, err := ctx.LockDir(nodeId, "toss")
996 defer ctx.UnlockDir(dirLock)
998 decompressor, err := zstd.NewReader(nil)
1002 defer decompressor.Close()
1003 for job := range ctx.Jobs(nodeId, xx) {
1004 pktName := filepath.Base(job.Path)
1006 {"Node", job.PktEnc.Sender},
1008 {"Nice", int(job.PktEnc.Nice)},
1010 if job.PktEnc.Nice > nice {
1011 ctx.LogD("rx-too-nice", les, func(les LEs) string {
1013 "Tossing %s/%s: too nice: %s",
1014 ctx.NodeName(job.PktEnc.Sender), pktName,
1015 NicenessFmt(job.PktEnc.Nice),
1020 fd, err := os.Open(job.Path)
1022 ctx.LogE("rx-open", les, err, func(les LEs) string {
1024 "Tossing %s/%s: opening %s",
1025 ctx.NodeName(job.PktEnc.Sender), pktName, job.Path,
1031 sender := ctx.Neigh[*job.PktEnc.Sender]
1033 err := errors.New("unknown node")
1034 ctx.LogE("rx-open", les, err, func(les LEs) string {
1037 ctx.NodeName(job.PktEnc.Sender), pktName,
1043 errs := make(chan error, 1)
1044 var sharedKey []byte
1046 pipeR, pipeW := io.Pipe()
1055 uint64(pktSizeWithoutEnc(job.Size)),
1058 dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK,
1061 pipeWB := bufio.NewWriter(pipeW)
1062 sharedKey, _, _, err = PktEncRead(
1065 bufio.NewReaderSize(fd, MTHBlockSize),
1071 pipeW.CloseWithError(err)
1073 if err := pipeWB.Flush(); err != nil {
1074 pipeW.CloseWithError(err)
1084 if err = <-errs; err == JobRepeatProcess {
1085 if _, err = fd.Seek(0, io.SeekStart); err != nil {
1086 ctx.LogE("rx-seek", les, err, func(les LEs) string {
1088 "Tossing %s/%s: can not seek",
1089 ctx.NodeName(job.PktEnc.Sender),
1097 } else if err != nil {
1105 func (ctx *Ctx) AutoToss(
1108 doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool,
1109 ) (chan struct{}, chan bool) {
1110 dw, err := ctx.NewDirWatcher(
1111 filepath.Join(ctx.Spool, nodeId.String(), string(TRx)),
1117 finish := make(chan struct{})
1118 badCode := make(chan bool)
1129 nodeId, TRx, nice, false,
1130 doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK) || bad
1134 return finish, badCode