2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2023 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
38 xdr "github.com/davecgh/go-xdr/xdr2"
39 "github.com/dustin/go-humanize"
40 "github.com/klauspost/compress/zstd"
41 "golang.org/x/crypto/blake2b"
42 "golang.org/x/crypto/poly1305"
49 type TossOpts struct {
61 func jobPath2Seen(jobPath string) string {
62 return filepath.Join(filepath.Dir(jobPath), SeenDir, filepath.Base(jobPath))
65 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
67 "From: " + fromTo.From,
69 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
75 "Content-Type: text/plain; charset=utf-8",
76 "Content-Transfer-Encoding: base64",
78 base64.StdEncoding.EncodeToString(body),
81 return strings.NewReader(strings.Join(lines, "\n"))
84 func pktSizeWithoutEnc(pktSize int64) int64 {
85 pktSize = pktSize - PktEncOverhead - PktOverhead - PktSizeOverhead
86 pktSizeBlocks := pktSize / (EncBlkSize + poly1305.TagSize)
87 if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
88 pktSize -= poly1305.TagSize
90 pktSize -= pktSizeBlocks * poly1305.TagSize
94 var JobRepeatProcess = errors.New("needs processing repeat")
105 decompressor *zstd.Decoder,
109 sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
111 _, err := xdr.Unmarshal(pipeR, &pkt)
113 ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
114 return fmt.Sprintf("Tossing %s/%s: unmarshal", sender.Name, pktName)
118 les = append(les, LE{"Size", int64(pktSize)})
119 ctx.LogD("rx", les, func(les LEs) string {
121 "Tossing %s/%s (%s)",
122 sender.Name, pktName,
123 humanize.IBytes(pktSize),
127 case PktTypeExec, PktTypeExecFat:
131 path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
132 handle := string(path[0])
133 args := make([]string, 0, len(path)-1)
134 for _, p := range path[1:] {
135 args = append(args, string(p))
137 argsStr := strings.Join(append([]string{handle}, args...), " ")
138 les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr})
139 cmdline := sender.Exec[handle]
140 if len(cmdline) == 0 {
141 err = errors.New("No handle found")
143 "rx-no-handle", les, err,
144 func(les LEs) string {
146 "Tossing exec %s/%s (%s): %s",
147 sender.Name, pktName,
148 humanize.IBytes(pktSize), argsStr,
154 if pkt.Type == PktTypeExec {
155 if err = decompressor.Reset(pipeR); err != nil {
160 cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...)
163 "NNCP_SELF="+ctx.Self.Id.String(),
164 "NNCP_SENDER="+sender.Id.String(),
165 "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
167 if pkt.Type == PktTypeExec {
168 cmd.Stdin = decompressor
172 output, err := cmd.CombinedOutput()
174 les = append(les, LE{"Output", strings.Split(
175 strings.Trim(string(output), "\n"), "\n"),
177 ctx.LogE("rx-handle", les, err, func(les LEs) string {
179 "Tossing exec %s/%s (%s): %s: handling",
180 sender.Name, pktName,
181 humanize.IBytes(uint64(pktSize)), argsStr,
186 if len(sendmail) > 0 && ctx.NotifyExec != nil {
187 notify := ctx.NotifyExec[sender.Name+"."+handle]
189 notify = ctx.NotifyExec["*."+handle]
194 append(sendmail[1:], notify.To)...,
196 cmd.Stdin = newNotification(notify, fmt.Sprintf(
197 "Exec from %s: %s", sender.Name, argsStr,
199 if err = cmd.Run(); err != nil {
200 ctx.LogE("rx-notify", les, err, func(les LEs) string {
202 "Tossing exec %s/%s (%s): %s: notifying",
203 sender.Name, pktName,
204 humanize.IBytes(pktSize), argsStr,
211 ctx.LogI("rx", les, func(les LEs) string {
213 "Got exec from %s to %s (%s)",
214 sender.Name, argsStr,
215 humanize.IBytes(pktSize),
218 if !opts.DryRun && jobPath != "" {
220 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
223 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
225 if err = DirSync(filepath.Dir(jobPath)); err != nil {
226 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
228 "Tossing file %s/%s (%s): %s: dirsyncing",
229 sender.Name, pktName,
230 humanize.IBytes(pktSize),
231 filepath.Base(jobPath),
238 if err = os.Remove(jobPath); err != nil {
239 ctx.LogE("rx-notify", les, err, func(les LEs) string {
241 "Tossing exec %s/%s (%s): %s: notifying",
242 sender.Name, pktName,
243 humanize.IBytes(pktSize), argsStr,
247 } else if ctx.HdrUsage {
248 os.Remove(JobPath2Hdr(jobPath))
256 dst := string(pkt.Path[:int(pkt.PathLen)])
257 les = append(les, LE{"Type", "file"}, LE{"Dst", dst})
258 if filepath.IsAbs(dst) {
259 err = errors.New("non-relative destination path")
261 "rx-non-rel", les, err,
262 func(les LEs) string {
264 "Tossing file %s/%s (%s): %s",
265 sender.Name, pktName,
266 humanize.IBytes(pktSize), dst,
272 incoming := sender.Incoming
274 err = errors.New("incoming is not allowed")
276 "rx-no-incoming", les, err,
277 func(les LEs) string {
279 "Tossing file %s/%s (%s): %s",
280 sender.Name, pktName,
281 humanize.IBytes(pktSize), dst,
287 dir := filepath.Join(*incoming, path.Dir(dst))
288 if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
289 ctx.LogE("rx-mkdir", les, err, func(les LEs) string {
291 "Tossing file %s/%s (%s): %s: mkdir",
292 sender.Name, pktName,
293 humanize.IBytes(pktSize), dst,
299 tmp, err := TempFile(dir, "file")
301 ctx.LogE("rx-mktemp", les, err, func(les LEs) string {
303 "Tossing file %s/%s (%s): %s: mktemp",
304 sender.Name, pktName,
305 humanize.IBytes(pktSize), dst,
310 les = append(les, LE{"Tmp", tmp.Name()})
311 ctx.LogD("rx-tmp-created", les, func(les LEs) string {
313 "Tossing file %s/%s (%s): %s: created: %s",
314 sender.Name, pktName,
315 humanize.IBytes(pktSize), dst, tmp.Name(),
318 bufW := bufio.NewWriter(tmp)
319 if _, err = CopyProgressed(
320 bufW, pipeR, "Rx file",
321 append(les, LE{"FullSize", int64(pktSize)}),
324 ctx.LogE("rx-copy", les, err, func(les LEs) string {
326 "Tossing file %s/%s (%s): %s: copying",
327 sender.Name, pktName,
328 humanize.IBytes(pktSize), dst,
333 if err = bufW.Flush(); err != nil {
335 ctx.LogE("rx-flush", les, err, func(les LEs) string {
337 "Tossing file %s/%s (%s): %s: flushing",
338 sender.Name, pktName,
339 humanize.IBytes(pktSize), dst,
345 if err = tmp.Sync(); err != nil {
347 ctx.LogE("rx-sync", les, err, func(les LEs) string {
349 "Tossing file %s/%s (%s): %s: syncing",
350 sender.Name, pktName,
351 humanize.IBytes(pktSize), dst,
357 if err = tmp.Close(); err != nil {
358 ctx.LogE("rx-close", les, err, func(les LEs) string {
360 "Tossing file %s/%s (%s): %s: closing",
361 sender.Name, pktName,
362 humanize.IBytes(pktSize), dst,
367 dstPathOrig := filepath.Join(*incoming, dst)
368 dstPath := dstPathOrig
371 if _, err = os.Stat(dstPath); err != nil {
372 if errors.Is(err, fs.ErrNotExist) {
375 ctx.LogE("rx-stat", les, err, func(les LEs) string {
377 "Tossing file %s/%s (%s): %s: stating: %s",
378 sender.Name, pktName,
379 humanize.IBytes(pktSize), dst, dstPath,
384 dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
387 if err = os.Rename(tmp.Name(), dstPath); err != nil {
388 ctx.LogE("rx-rename", les, err, func(les LEs) string {
390 "Tossing file %s/%s (%s): %s: renaming",
391 sender.Name, pktName,
392 humanize.IBytes(pktSize), dst,
397 if err = DirSync(*incoming); err != nil {
398 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
400 "Tossing file %s/%s (%s): %s: dirsyncing",
401 sender.Name, pktName,
402 humanize.IBytes(pktSize), dst,
407 les = les[:len(les)-1] // delete Tmp
409 ctx.LogI("rx", les, func(les LEs) string {
411 "Got file %s (%s) from %s",
412 dst, humanize.IBytes(pktSize), sender.Name,
418 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
421 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
423 if err = DirSync(filepath.Dir(jobPath)); err != nil {
424 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
426 "Tossing file %s/%s (%s): %s: dirsyncing",
427 sender.Name, pktName,
428 humanize.IBytes(pktSize),
429 filepath.Base(jobPath),
436 if err = os.Remove(jobPath); err != nil {
437 ctx.LogE("rx-remove", les, err, func(les LEs) string {
439 "Tossing file %s/%s (%s): %s: removing",
440 sender.Name, pktName,
441 humanize.IBytes(pktSize), dst,
445 } else if ctx.HdrUsage {
446 os.Remove(JobPath2Hdr(jobPath))
449 if len(sendmail) > 0 && ctx.NotifyFile != nil {
452 append(sendmail[1:], ctx.NotifyFile.To)...,
454 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
455 "File from %s: %s (%s)",
456 sender.Name, dst, humanize.IBytes(pktSize),
458 if err = cmd.Run(); err != nil {
459 ctx.LogE("rx-notify", les, err, func(les LEs) string {
461 "Tossing file %s/%s (%s): %s: notifying",
462 sender.Name, pktName,
463 humanize.IBytes(pktSize), dst,
474 src := string(pkt.Path[:int(pkt.PathLen)])
475 les := append(les, LE{"Type", "freq"}, LE{"Src", src})
476 if filepath.IsAbs(src) {
477 err = errors.New("non-relative source path")
479 "rx-non-rel", les, err,
480 func(les LEs) string {
482 "Tossing freq %s/%s (%s): %s: notifying",
483 sender.Name, pktName,
484 humanize.IBytes(pktSize), src,
490 dstRaw, err := io.ReadAll(pipeR)
492 ctx.LogE("rx-read", les, err, func(les LEs) string {
494 "Tossing freq %s/%s (%s): %s: reading",
495 sender.Name, pktName,
496 humanize.IBytes(pktSize), src,
501 dst := string(dstRaw)
502 les = append(les, LE{"Dst", dst})
503 freqPath := sender.FreqPath
505 err = errors.New("freqing is not allowed")
507 "rx-no-freq", les, err,
508 func(les LEs) string {
510 "Tossing freq %s/%s (%s): %s -> %s",
511 sender.Name, pktName,
512 humanize.IBytes(pktSize), src, dst,
522 filepath.Join(*freqPath, src),
530 ctx.LogE("rx-tx", les, err, func(les LEs) string {
532 "Tossing freq %s/%s (%s): %s -> %s: txing",
533 sender.Name, pktName,
534 humanize.IBytes(pktSize), src, dst,
540 ctx.LogI("rx", les, func(les LEs) string {
541 return fmt.Sprintf("Got file request %s to %s", src, sender.Name)
546 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
549 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
551 if err = DirSync(filepath.Dir(jobPath)); err != nil {
552 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
554 "Tossing file %s/%s (%s): %s: dirsyncing",
555 sender.Name, pktName,
556 humanize.IBytes(pktSize),
557 filepath.Base(jobPath),
564 if err = os.Remove(jobPath); err != nil {
565 ctx.LogE("rx-remove", les, err, func(les LEs) string {
567 "Tossing freq %s/%s (%s): %s -> %s: removing",
568 sender.Name, pktName,
569 humanize.IBytes(pktSize), src, dst,
573 } else if ctx.HdrUsage {
574 os.Remove(JobPath2Hdr(jobPath))
577 if len(sendmail) > 0 && ctx.NotifyFreq != nil {
580 append(sendmail[1:], ctx.NotifyFreq.To)...,
582 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
583 "Freq from %s: %s", sender.Name, src,
585 if err = cmd.Run(); err != nil {
586 ctx.LogE("rx-notify", les, err, func(les LEs) string {
588 "Tossing freq %s/%s (%s): %s -> %s: notifying",
589 sender.Name, pktName,
590 humanize.IBytes(pktSize), src, dst,
601 dst := new([MTHSize]byte)
602 copy(dst[:], pkt.Path[:int(pkt.PathLen)])
603 nodeId := NodeId(*dst)
604 les := append(les, LE{"Type", "trns"}, LE{"Dst", nodeId})
605 logMsg := func(les LEs) string {
607 "Tossing trns %s/%s (%s): %s",
608 sender.Name, pktName,
609 humanize.IBytes(pktSize),
613 node := ctx.Neigh[nodeId]
615 err = errors.New("unknown node")
616 ctx.LogE("rx-unknown", les, err, logMsg)
619 ctx.LogD("rx-tx", les, logMsg)
621 if len(node.Via) == 0 {
622 if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil {
623 ctx.LogE("rx", les, err, func(les LEs) string {
624 return logMsg(les) + ": txing"
629 via := node.Via[:len(node.Via)-1]
630 node = ctx.Neigh[*node.Via[len(node.Via)-1]]
631 node = &Node{Id: node.Id, Via: via, ExchPub: node.ExchPub}
632 pktTrns, err := NewPkt(PktTypeTrns, 0, nodeId[:])
636 if _, _, _, err = ctx.Tx(
640 int64(pktSize), 0, MaxFileSize,
645 ctx.LogE("rx", les, err, func(les LEs) string {
646 return logMsg(les) + ": txing"
652 ctx.LogI("rx", les, func(les LEs) string {
654 "Got transitional packet from %s to %s (%s)",
656 ctx.NodeName(&nodeId),
657 humanize.IBytes(pktSize),
660 if !opts.DryRun && jobPath != "" {
662 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
665 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
667 if err = DirSync(filepath.Dir(jobPath)); err != nil {
668 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
670 "Tossing file %s/%s (%s): %s: dirsyncing",
671 sender.Name, pktName,
672 humanize.IBytes(pktSize),
673 filepath.Base(jobPath),
680 if err = os.Remove(jobPath); err != nil {
681 ctx.LogE("rx", les, err, func(les LEs) string {
683 "Tossing trns %s/%s (%s): %s: removing",
684 sender.Name, pktName,
685 humanize.IBytes(pktSize),
686 ctx.NodeName(&nodeId),
690 } else if ctx.HdrUsage {
691 os.Remove(JobPath2Hdr(jobPath))
699 areaId := new(AreaId)
700 copy(areaId[:], pkt.Path[:int(pkt.PathLen)])
701 les := append(les, LE{"Type", "area"}, LE{"Area", areaId})
702 logMsg := func(les LEs) string {
704 "Tossing %s/%s (%s): area %s",
705 sender.Name, pktName,
706 humanize.IBytes(pktSize),
707 ctx.AreaName(areaId),
710 area := ctx.AreaId2Area[*areaId]
712 err = errors.New("unknown area")
713 ctx.LogE("rx-area-unknown", les, err, logMsg)
716 pktEnc, pktEncRaw, err := ctx.HdrRead(pipeR)
717 fullPipeR := io.MultiReader(bytes.NewReader(pktEncRaw), pipeR)
719 ctx.LogE("rx-area-pkt-enc-read", les, err, logMsg)
722 msgHashRaw := blake2b.Sum256(pktEncRaw)
723 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
724 les = append(les, LE{"AreaMsg", msgHash})
725 ctx.LogD("rx-area", les, logMsg)
728 for _, nodeId := range area.Subs {
729 node := ctx.Neigh[*nodeId]
730 lesEcho := append(les, LE{"Echo", nodeId})
731 seenDir := filepath.Join(
732 ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
734 seenPath := filepath.Join(seenDir, msgHash)
735 logMsgNode := func(les LEs) string {
737 "%s: echoing to: %s", logMsg(les), node.Name,
740 if _, err := os.Stat(seenPath); err == nil {
741 ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
742 return logMsgNode(les) + ": already sent"
746 ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
749 for _, nodeId := range area.Subs {
750 node := ctx.Neigh[*nodeId]
751 lesEcho := append(les, LE{"Echo", nodeId})
752 seenDir := filepath.Join(
753 ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
755 seenPath := filepath.Join(seenDir, msgHash)
756 logMsgNode := func(les LEs) string {
757 return fmt.Sprintf("%s: echo to: %s", logMsg(les), node.Name)
759 if _, err := os.Stat(seenPath); err == nil {
760 ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
761 return logMsgNode(les) + ": already sent"
765 if nodeId != sender.Id && nodeId != pktEnc.Sender {
766 ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
767 if _, _, _, err = ctx.Tx(
771 int64(pktSize), 0, MaxFileSize,
776 ctx.LogE("rx-area", lesEcho, err, logMsgNode)
780 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
781 ctx.LogE("rx-area-mkdir", lesEcho, err, logMsgNode)
784 if fd, err := os.Create(seenPath); err == nil {
786 if err = DirSync(seenDir); err != nil {
787 ctx.LogE("rx-area-dirsync", les, err, logMsgNode)
791 ctx.LogE("rx-area-touch", lesEcho, err, logMsgNode)
794 return JobRepeatProcess
798 seenDir := filepath.Join(
799 ctx.Spool, ctx.SelfId.String(), AreaDir, area.Id.String(),
801 seenPath := filepath.Join(seenDir, msgHash)
802 if _, err := os.Stat(seenPath); err == nil {
803 ctx.LogD("rx-area-seen", les, func(les LEs) string {
804 return logMsg(les) + ": already seen"
806 if !opts.DryRun && jobPath != "" {
807 if err = os.Remove(jobPath); err != nil {
808 ctx.LogE("rx-area-remove", les, err, func(les LEs) string {
810 "Tossing area %s/%s (%s): %s: removing",
811 sender.Name, pktName,
812 humanize.IBytes(pktSize),
817 } else if ctx.HdrUsage {
818 os.Remove(JobPath2Hdr(jobPath))
825 ctx.LogD("rx-area-no-prv", les, func(les LEs) string {
826 return logMsg(les) + ": no private key for decoding"
829 signatureVerify := true
830 if _, senderKnown := ctx.Neigh[*pktEnc.Sender]; !senderKnown {
831 if !area.AllowUnknown {
832 err = errors.New("unknown sender")
835 append(les, LE{"Sender", pktEnc.Sender}),
837 func(les LEs) string {
838 return logMsg(les) + ": sender: " + pktEnc.Sender.String()
843 signatureVerify = false
845 areaNodeOur := NodeOur{Id: new(NodeId), ExchPrv: new([32]byte)}
846 copy(areaNodeOur.Id[:], area.Id[:])
847 copy(areaNodeOur.ExchPrv[:], area.Prv[:])
851 Incoming: area.Incoming,
854 copy(areaNode.Id[:], area.Id[:])
855 pktName := fmt.Sprintf(
857 Base32Codec.EncodeToString(areaId[:]), msgHash,
860 pipeR, pipeW := io.Pipe()
861 errs := make(chan error, 1)
870 uint64(pktSizeWithoutEnc(int64(pktSize))),
876 _, _, _, err = PktEncRead(
885 ctx.LogE("rx-area-pkt-enc-read2", les, err, logMsg)
886 pipeW.CloseWithError(err)
891 if err = <-errs; err != nil {
896 if !opts.DryRun && jobPath != "" {
897 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
898 ctx.LogE("rx-area-mkdir", les, err, logMsg)
901 if fd, err := os.Create(seenPath); err == nil {
903 if err = DirSync(seenDir); err != nil {
904 ctx.LogE("rx-area-dirsync", les, err, logMsg)
908 if err = os.Remove(jobPath); err != nil {
909 ctx.LogE("rx", les, err, func(les LEs) string {
911 "Tossing area %s/%s (%s): %s: removing",
912 sender.Name, pktName,
913 humanize.IBytes(pktSize),
918 } else if ctx.HdrUsage {
919 os.Remove(JobPath2Hdr(jobPath))
927 hsh := Base32Codec.EncodeToString(pkt.Path[:MTHSize])
928 les := append(les, LE{"Type", "ack"}, LE{"Pkt", hsh})
929 logMsg := func(les LEs) string {
930 return fmt.Sprintf("Tossing ack %s/%s: %s", sender.Name, pktName, hsh)
932 ctx.LogD("rx-ack", les, logMsg)
933 pktPath := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), hsh)
934 if _, err := os.Stat(pktPath); err == nil {
936 if err = os.Remove(pktPath); err != nil {
937 ctx.LogE("rx-ack", les, err, func(les LEs) string {
938 return logMsg(les) + ": removing packet"
941 } else if ctx.HdrUsage {
942 os.Remove(JobPath2Hdr(pktPath))
946 ctx.LogD("rx-ack", les, func(les LEs) string {
947 return logMsg(les) + ": already disappeared"
950 if !opts.DryRun && opts.DoSeen {
951 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
954 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
956 if err = DirSync(filepath.Dir(jobPath)); err != nil {
957 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
959 "Tossing file %s/%s (%s): %s: dirsyncing",
960 sender.Name, pktName,
961 humanize.IBytes(pktSize),
962 filepath.Base(jobPath),
970 if err = os.Remove(jobPath); err != nil {
971 ctx.LogE("rx", les, err, func(les LEs) string {
972 return logMsg(les) + ": removing job"
975 } else if ctx.HdrUsage {
976 os.Remove(JobPath2Hdr(jobPath))
979 ctx.LogI("rx", les, func(les LEs) string {
980 return fmt.Sprintf("Got ACK packet from %s of %s", sender.Name, hsh)
984 err = errors.New("unknown type")
986 "rx-type-unknown", les, err,
987 func(les LEs) string {
989 "Tossing %s/%s (%s)",
990 sender.Name, pktName, humanize.IBytes(pktSize),
999 func (ctx *Ctx) Toss(nodeId *NodeId, xx TRxTx, opts *TossOpts) bool {
1000 dirLock, err := ctx.LockDir(nodeId, "toss")
1004 defer ctx.UnlockDir(dirLock)
1006 decompressor, err := zstd.NewReader(nil)
1010 defer decompressor.Close()
1011 for job := range ctx.Jobs(nodeId, xx) {
1012 pktName := filepath.Base(job.Path)
1014 {"Node", job.PktEnc.Sender},
1016 {"Nice", int(job.PktEnc.Nice)},
1018 if job.PktEnc.Nice > opts.Nice {
1019 ctx.LogD("rx-too-nice", les, func(les LEs) string {
1021 "Tossing %s/%s: too nice: %s",
1022 ctx.NodeName(job.PktEnc.Sender), pktName,
1023 NicenessFmt(job.PktEnc.Nice),
1028 fd, err := os.Open(job.Path)
1030 ctx.LogE("rx-open", les, err, func(les LEs) string {
1032 "Tossing %s/%s: opening %s",
1033 ctx.NodeName(job.PktEnc.Sender), pktName, job.Path,
1039 sender := ctx.Neigh[*job.PktEnc.Sender]
1041 err := errors.New("unknown node")
1042 ctx.LogE("rx-open", les, err, func(les LEs) string {
1045 ctx.NodeName(job.PktEnc.Sender), pktName,
1051 errs := make(chan error, 1)
1052 var sharedKey []byte
1054 pipeR, pipeW := io.Pipe()
1063 uint64(pktSizeWithoutEnc(job.Size)),
1069 pipeWB := bufio.NewWriter(pipeW)
1070 sharedKey, _, _, err = PktEncRead(
1073 bufio.NewReaderSize(fd, MTHBlockSize),
1079 pipeW.CloseWithError(err)
1081 if err := pipeWB.Flush(); err != nil {
1082 pipeW.CloseWithError(err)
1092 if err = <-errs; err == JobRepeatProcess {
1093 if _, err = fd.Seek(0, io.SeekStart); err != nil {
1094 ctx.LogE("rx-seek", les, err, func(les LEs) string {
1096 "Tossing %s/%s: can not seek",
1097 ctx.NodeName(job.PktEnc.Sender),
1105 } else if err != nil {
1113 func (ctx *Ctx) AutoToss(nodeId *NodeId, opts *TossOpts) (chan struct{}, chan bool) {
1114 dw, err := ctx.NewDirWatcher(
1115 filepath.Join(ctx.Spool, nodeId.String(), string(TRx)),
1121 finish := make(chan struct{})
1122 badCode := make(chan bool)
1132 bad = !ctx.Toss(nodeId, TRx, opts) || bad
1136 return finish, badCode