2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2023 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
38 xdr "github.com/davecgh/go-xdr/xdr2"
39 "github.com/dustin/go-humanize"
40 "github.com/klauspost/compress/zstd"
41 "golang.org/x/crypto/blake2b"
42 "golang.org/x/crypto/poly1305"
50 type TossOpts struct {
63 func jobPath2Seen(jobPath string) string {
64 return filepath.Join(filepath.Dir(jobPath), SeenDir, filepath.Base(jobPath))
67 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
69 "From: " + fromTo.From,
71 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
77 "Content-Type: text/plain; charset=utf-8",
78 "Content-Transfer-Encoding: base64",
80 base64.StdEncoding.EncodeToString(body),
83 return strings.NewReader(strings.Join(lines, "\n"))
86 func pktSizeWithoutEnc(pktSize int64) int64 {
87 pktSize = pktSize - PktEncOverhead - PktOverhead - PktSizeOverhead
88 pktSizeBlocks := pktSize / (EncBlkSize + poly1305.TagSize)
89 if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
90 pktSize -= poly1305.TagSize
92 pktSize -= pktSizeBlocks * poly1305.TagSize
96 var JobRepeatProcess = errors.New("needs processing repeat")
100 pipeR *io.PipeReader,
107 decompressor *zstd.Decoder,
111 sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
113 _, err := xdr.Unmarshal(pipeR, &pkt)
115 ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
116 return fmt.Sprintf("Tossing %s/%s: unmarshal", sender.Name, pktName)
120 les = append(les, LE{"Size", int64(pktSize)})
121 ctx.LogD("rx", les, func(les LEs) string {
123 "Tossing %s/%s (%s)",
124 sender.Name, pktName,
125 humanize.IBytes(pktSize),
128 if opts.GenACK && pkt.Type != PktTypeACK {
129 newPktName, err := ctx.TxACK(
130 sender, sender.ACKNice, pktName, sender.ACKMinSize,
133 ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
134 return fmt.Sprintf("Tossing %s/%s: generating ACK", sender.Name, pktName)
138 ackDir := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), ACKDir)
139 os.MkdirAll(ackDir, os.FileMode(0777))
140 if fd, err := os.Create(filepath.Join(ackDir, newPktName)); err == nil {
142 if err = DirSync(ackDir); err != nil {
143 ctx.LogE("rx-genack", les, err, func(les LEs) string {
144 return fmt.Sprintf("Tossing %s/%s: genACK", sender.Name, pktName)
149 ctx.LogE("rx-genack", les, err, func(les LEs) string {
150 return fmt.Sprintf("Tossing %s/%s: genACK", sender.Name, pktName)
156 case PktTypeExec, PktTypeExecFat:
160 path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
161 handle := string(path[0])
162 args := make([]string, 0, len(path)-1)
163 for _, p := range path[1:] {
164 args = append(args, string(p))
166 argsStr := strings.Join(append([]string{handle}, args...), " ")
167 les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr})
168 cmdline := sender.Exec[handle]
169 if len(cmdline) == 0 {
170 err = errors.New("No handle found")
172 "rx-no-handle", les, err,
173 func(les LEs) string {
175 "Tossing exec %s/%s (%s): %s",
176 sender.Name, pktName,
177 humanize.IBytes(pktSize), argsStr,
183 if pkt.Type == PktTypeExec {
184 if err = decompressor.Reset(pipeR); err != nil {
189 cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...)
192 "NNCP_SELF="+ctx.Self.Id.String(),
193 "NNCP_SENDER="+sender.Id.String(),
194 "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
196 if pkt.Type == PktTypeExec {
197 cmd.Stdin = decompressor
201 output, err := cmd.CombinedOutput()
203 les = append(les, LE{"Output", strings.Split(
204 strings.Trim(string(output), "\n"), "\n"),
206 ctx.LogE("rx-handle", les, err, func(les LEs) string {
208 "Tossing exec %s/%s (%s): %s: handling",
209 sender.Name, pktName,
210 humanize.IBytes(uint64(pktSize)), argsStr,
215 if len(sendmail) > 0 && ctx.NotifyExec != nil {
216 notify := ctx.NotifyExec[sender.Name+"."+handle]
218 notify = ctx.NotifyExec["*."+handle]
223 append(sendmail[1:], notify.To)...,
225 cmd.Stdin = newNotification(notify, fmt.Sprintf(
226 "Exec from %s: %s", sender.Name, argsStr,
228 if err = cmd.Run(); err != nil {
229 ctx.LogE("rx-notify", les, err, func(les LEs) string {
231 "Tossing exec %s/%s (%s): %s: notifying",
232 sender.Name, pktName,
233 humanize.IBytes(pktSize), argsStr,
240 ctx.LogI("rx", les, func(les LEs) string {
242 "Got exec from %s to %s (%s)",
243 sender.Name, argsStr,
244 humanize.IBytes(pktSize),
247 if !opts.DryRun && jobPath != "" {
249 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
252 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
254 if err = DirSync(filepath.Dir(jobPath)); err != nil {
255 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
257 "Tossing file %s/%s (%s): %s: dirsyncing",
258 sender.Name, pktName,
259 humanize.IBytes(pktSize),
260 filepath.Base(jobPath),
267 if err = os.Remove(jobPath); err != nil {
268 ctx.LogE("rx-notify", les, err, func(les LEs) string {
270 "Tossing exec %s/%s (%s): %s: notifying",
271 sender.Name, pktName,
272 humanize.IBytes(pktSize), argsStr,
276 } else if ctx.HdrUsage {
277 os.Remove(JobPath2Hdr(jobPath))
285 dst := string(pkt.Path[:int(pkt.PathLen)])
286 les = append(les, LE{"Type", "file"}, LE{"Dst", dst})
287 if filepath.IsAbs(dst) {
288 err = errors.New("non-relative destination path")
290 "rx-non-rel", les, err,
291 func(les LEs) string {
293 "Tossing file %s/%s (%s): %s",
294 sender.Name, pktName,
295 humanize.IBytes(pktSize), dst,
301 incoming := sender.Incoming
303 err = errors.New("incoming is not allowed")
305 "rx-no-incoming", les, err,
306 func(les LEs) string {
308 "Tossing file %s/%s (%s): %s",
309 sender.Name, pktName,
310 humanize.IBytes(pktSize), dst,
316 dir := filepath.Join(*incoming, path.Dir(dst))
317 if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
318 ctx.LogE("rx-mkdir", les, err, func(les LEs) string {
320 "Tossing file %s/%s (%s): %s: mkdir",
321 sender.Name, pktName,
322 humanize.IBytes(pktSize), dst,
328 tmp, err := TempFile(dir, "file")
330 ctx.LogE("rx-mktemp", les, err, func(les LEs) string {
332 "Tossing file %s/%s (%s): %s: mktemp",
333 sender.Name, pktName,
334 humanize.IBytes(pktSize), dst,
339 les = append(les, LE{"Tmp", tmp.Name()})
340 ctx.LogD("rx-tmp-created", les, func(les LEs) string {
342 "Tossing file %s/%s (%s): %s: created: %s",
343 sender.Name, pktName,
344 humanize.IBytes(pktSize), dst, tmp.Name(),
347 bufW := bufio.NewWriter(tmp)
348 if _, err = CopyProgressed(
349 bufW, pipeR, "Rx file",
350 append(les, LE{"FullSize", int64(pktSize)}),
353 ctx.LogE("rx-copy", les, err, func(les LEs) string {
355 "Tossing file %s/%s (%s): %s: copying",
356 sender.Name, pktName,
357 humanize.IBytes(pktSize), dst,
362 if err = bufW.Flush(); err != nil {
364 ctx.LogE("rx-flush", les, err, func(les LEs) string {
366 "Tossing file %s/%s (%s): %s: flushing",
367 sender.Name, pktName,
368 humanize.IBytes(pktSize), dst,
374 if err = tmp.Sync(); err != nil {
376 ctx.LogE("rx-sync", les, err, func(les LEs) string {
378 "Tossing file %s/%s (%s): %s: syncing",
379 sender.Name, pktName,
380 humanize.IBytes(pktSize), dst,
386 if err = tmp.Close(); err != nil {
387 ctx.LogE("rx-close", les, err, func(les LEs) string {
389 "Tossing file %s/%s (%s): %s: closing",
390 sender.Name, pktName,
391 humanize.IBytes(pktSize), dst,
396 dstPathOrig := filepath.Join(*incoming, dst)
397 dstPath := dstPathOrig
400 if _, err = os.Stat(dstPath); err != nil {
401 if errors.Is(err, fs.ErrNotExist) {
404 ctx.LogE("rx-stat", les, err, func(les LEs) string {
406 "Tossing file %s/%s (%s): %s: stating: %s",
407 sender.Name, pktName,
408 humanize.IBytes(pktSize), dst, dstPath,
413 dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
416 if err = os.Rename(tmp.Name(), dstPath); err != nil {
417 ctx.LogE("rx-rename", les, err, func(les LEs) string {
419 "Tossing file %s/%s (%s): %s: renaming",
420 sender.Name, pktName,
421 humanize.IBytes(pktSize), dst,
426 if err = DirSync(*incoming); err != nil {
427 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
429 "Tossing file %s/%s (%s): %s: dirsyncing",
430 sender.Name, pktName,
431 humanize.IBytes(pktSize), dst,
436 les = les[:len(les)-1] // delete Tmp
438 ctx.LogI("rx", les, func(les LEs) string {
440 "Got file %s (%s) from %s",
441 dst, humanize.IBytes(pktSize), sender.Name,
447 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
450 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
452 if err = DirSync(filepath.Dir(jobPath)); err != nil {
453 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
455 "Tossing file %s/%s (%s): %s: dirsyncing",
456 sender.Name, pktName,
457 humanize.IBytes(pktSize),
458 filepath.Base(jobPath),
465 if err = os.Remove(jobPath); err != nil {
466 ctx.LogE("rx-remove", les, err, func(les LEs) string {
468 "Tossing file %s/%s (%s): %s: removing",
469 sender.Name, pktName,
470 humanize.IBytes(pktSize), dst,
474 } else if ctx.HdrUsage {
475 os.Remove(JobPath2Hdr(jobPath))
478 if len(sendmail) > 0 && ctx.NotifyFile != nil {
481 append(sendmail[1:], ctx.NotifyFile.To)...,
483 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
484 "File from %s: %s (%s)",
485 sender.Name, dst, humanize.IBytes(pktSize),
487 if err = cmd.Run(); err != nil {
488 ctx.LogE("rx-notify", les, err, func(les LEs) string {
490 "Tossing file %s/%s (%s): %s: notifying",
491 sender.Name, pktName,
492 humanize.IBytes(pktSize), dst,
503 src := string(pkt.Path[:int(pkt.PathLen)])
504 les := append(les, LE{"Type", "freq"}, LE{"Src", src})
505 if filepath.IsAbs(src) {
506 err = errors.New("non-relative source path")
508 "rx-non-rel", les, err,
509 func(les LEs) string {
511 "Tossing freq %s/%s (%s): %s: notifying",
512 sender.Name, pktName,
513 humanize.IBytes(pktSize), src,
519 dstRaw, err := io.ReadAll(pipeR)
521 ctx.LogE("rx-read", les, err, func(les LEs) string {
523 "Tossing freq %s/%s (%s): %s: reading",
524 sender.Name, pktName,
525 humanize.IBytes(pktSize), src,
530 dst := string(dstRaw)
531 les = append(les, LE{"Dst", dst})
532 freqPath := sender.FreqPath
534 err = errors.New("freqing is not allowed")
536 "rx-no-freq", les, err,
537 func(les LEs) string {
539 "Tossing freq %s/%s (%s): %s -> %s",
540 sender.Name, pktName,
541 humanize.IBytes(pktSize), src, dst,
551 filepath.Join(*freqPath, src),
559 ctx.LogE("rx-tx", les, err, func(les LEs) string {
561 "Tossing freq %s/%s (%s): %s -> %s: txing",
562 sender.Name, pktName,
563 humanize.IBytes(pktSize), src, dst,
569 ctx.LogI("rx", les, func(les LEs) string {
570 return fmt.Sprintf("Got file request %s to %s", src, sender.Name)
575 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
578 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
580 if err = DirSync(filepath.Dir(jobPath)); err != nil {
581 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
583 "Tossing file %s/%s (%s): %s: dirsyncing",
584 sender.Name, pktName,
585 humanize.IBytes(pktSize),
586 filepath.Base(jobPath),
593 if err = os.Remove(jobPath); err != nil {
594 ctx.LogE("rx-remove", les, err, func(les LEs) string {
596 "Tossing freq %s/%s (%s): %s -> %s: removing",
597 sender.Name, pktName,
598 humanize.IBytes(pktSize), src, dst,
602 } else if ctx.HdrUsage {
603 os.Remove(JobPath2Hdr(jobPath))
606 if len(sendmail) > 0 && ctx.NotifyFreq != nil {
609 append(sendmail[1:], ctx.NotifyFreq.To)...,
611 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
612 "Freq from %s: %s", sender.Name, src,
614 if err = cmd.Run(); err != nil {
615 ctx.LogE("rx-notify", les, err, func(les LEs) string {
617 "Tossing freq %s/%s (%s): %s -> %s: notifying",
618 sender.Name, pktName,
619 humanize.IBytes(pktSize), src, dst,
630 dst := new([MTHSize]byte)
631 copy(dst[:], pkt.Path[:int(pkt.PathLen)])
632 nodeId := NodeId(*dst)
633 les := append(les, LE{"Type", "trns"}, LE{"Dst", nodeId})
634 logMsg := func(les LEs) string {
636 "Tossing trns %s/%s (%s): %s",
637 sender.Name, pktName,
638 humanize.IBytes(pktSize),
642 node := ctx.Neigh[nodeId]
644 err = errors.New("unknown node")
645 ctx.LogE("rx-unknown", les, err, logMsg)
648 ctx.LogD("rx-tx", les, logMsg)
650 if len(node.Via) == 0 {
651 if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil {
652 ctx.LogE("rx", les, err, func(les LEs) string {
653 return logMsg(les) + ": txing"
658 via := node.Via[:len(node.Via)-1]
659 node = ctx.Neigh[*node.Via[len(node.Via)-1]]
660 node = &Node{Id: node.Id, Via: via, ExchPub: node.ExchPub}
661 pktTrns, err := NewPkt(PktTypeTrns, 0, nodeId[:])
665 if _, _, _, err = ctx.Tx(
669 int64(pktSize), 0, MaxFileSize,
674 ctx.LogE("rx", les, err, func(les LEs) string {
675 return logMsg(les) + ": txing"
681 ctx.LogI("rx", les, func(les LEs) string {
683 "Got transitional packet from %s to %s (%s)",
685 ctx.NodeName(&nodeId),
686 humanize.IBytes(pktSize),
689 if !opts.DryRun && jobPath != "" {
691 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
694 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
696 if err = DirSync(filepath.Dir(jobPath)); err != nil {
697 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
699 "Tossing file %s/%s (%s): %s: dirsyncing",
700 sender.Name, pktName,
701 humanize.IBytes(pktSize),
702 filepath.Base(jobPath),
709 if err = os.Remove(jobPath); err != nil {
710 ctx.LogE("rx", les, err, func(les LEs) string {
712 "Tossing trns %s/%s (%s): %s: removing",
713 sender.Name, pktName,
714 humanize.IBytes(pktSize),
715 ctx.NodeName(&nodeId),
719 } else if ctx.HdrUsage {
720 os.Remove(JobPath2Hdr(jobPath))
728 areaId := new(AreaId)
729 copy(areaId[:], pkt.Path[:int(pkt.PathLen)])
730 les := append(les, LE{"Type", "area"}, LE{"Area", areaId})
731 logMsg := func(les LEs) string {
733 "Tossing %s/%s (%s): area %s",
734 sender.Name, pktName,
735 humanize.IBytes(pktSize),
736 ctx.AreaName(areaId),
739 area := ctx.AreaId2Area[*areaId]
741 err = errors.New("unknown area")
742 ctx.LogE("rx-area-unknown", les, err, logMsg)
745 pktEnc, pktEncRaw, err := ctx.HdrRead(pipeR)
746 fullPipeR := io.MultiReader(bytes.NewReader(pktEncRaw), pipeR)
748 ctx.LogE("rx-area-pkt-enc-read", les, err, logMsg)
751 msgHashRaw := blake2b.Sum256(pktEncRaw)
752 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
753 les = append(les, LE{"AreaMsg", msgHash})
754 ctx.LogD("rx-area", les, logMsg)
757 for _, nodeId := range area.Subs {
758 node := ctx.Neigh[*nodeId]
759 lesEcho := append(les, LE{"Echo", nodeId})
760 seenDir := filepath.Join(
761 ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
763 seenPath := filepath.Join(seenDir, msgHash)
764 logMsgNode := func(les LEs) string {
766 "%s: echoing to: %s", logMsg(les), node.Name,
769 if _, err := os.Stat(seenPath); err == nil {
770 ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
771 return logMsgNode(les) + ": already sent"
775 ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
778 for _, nodeId := range area.Subs {
779 node := ctx.Neigh[*nodeId]
780 lesEcho := append(les, LE{"Echo", nodeId})
781 seenDir := filepath.Join(
782 ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
784 seenPath := filepath.Join(seenDir, msgHash)
785 logMsgNode := func(les LEs) string {
786 return fmt.Sprintf("%s: echo to: %s", logMsg(les), node.Name)
788 if _, err := os.Stat(seenPath); err == nil {
789 ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
790 return logMsgNode(les) + ": already sent"
794 if nodeId != sender.Id && nodeId != pktEnc.Sender {
795 ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
796 if _, _, _, err = ctx.Tx(
800 int64(pktSize), 0, MaxFileSize,
805 ctx.LogE("rx-area", lesEcho, err, logMsgNode)
809 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
810 ctx.LogE("rx-area-mkdir", lesEcho, err, logMsgNode)
813 if fd, err := os.Create(seenPath); err == nil {
815 if err = DirSync(seenDir); err != nil {
816 ctx.LogE("rx-area-dirsync", les, err, logMsgNode)
820 ctx.LogE("rx-area-touch", lesEcho, err, logMsgNode)
823 return JobRepeatProcess
827 seenDir := filepath.Join(
828 ctx.Spool, ctx.SelfId.String(), AreaDir, area.Id.String(),
830 seenPath := filepath.Join(seenDir, msgHash)
831 if _, err := os.Stat(seenPath); err == nil {
832 ctx.LogD("rx-area-seen", les, func(les LEs) string {
833 return logMsg(les) + ": already seen"
835 if !opts.DryRun && jobPath != "" {
836 if err = os.Remove(jobPath); err != nil {
837 ctx.LogE("rx-area-remove", les, err, func(les LEs) string {
839 "Tossing area %s/%s (%s): %s: removing",
840 sender.Name, pktName,
841 humanize.IBytes(pktSize),
846 } else if ctx.HdrUsage {
847 os.Remove(JobPath2Hdr(jobPath))
854 ctx.LogD("rx-area-no-prv", les, func(les LEs) string {
855 return logMsg(les) + ": no private key for decoding"
858 signatureVerify := true
859 if _, senderKnown := ctx.Neigh[*pktEnc.Sender]; !senderKnown {
860 if !area.AllowUnknown {
861 err = errors.New("unknown sender")
864 append(les, LE{"Sender", pktEnc.Sender}),
866 func(les LEs) string {
867 return logMsg(les) + ": sender: " + pktEnc.Sender.String()
872 signatureVerify = false
874 areaNodeOur := NodeOur{Id: new(NodeId), ExchPrv: new([32]byte)}
875 copy(areaNodeOur.Id[:], area.Id[:])
876 copy(areaNodeOur.ExchPrv[:], area.Prv[:])
880 Incoming: area.Incoming,
883 copy(areaNode.Id[:], area.Id[:])
884 pktName := fmt.Sprintf(
886 Base32Codec.EncodeToString(areaId[:]), msgHash,
889 pipeR, pipeW := io.Pipe()
890 errs := make(chan error, 1)
899 uint64(pktSizeWithoutEnc(int64(pktSize))),
905 _, _, _, err = PktEncRead(
914 ctx.LogE("rx-area-pkt-enc-read2", les, err, logMsg)
915 pipeW.CloseWithError(err)
920 if err = <-errs; err != nil {
925 if !opts.DryRun && jobPath != "" {
926 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
927 ctx.LogE("rx-area-mkdir", les, err, logMsg)
930 if fd, err := os.Create(seenPath); err == nil {
932 if err = DirSync(seenDir); err != nil {
933 ctx.LogE("rx-area-dirsync", les, err, logMsg)
937 if err = os.Remove(jobPath); err != nil {
938 ctx.LogE("rx", les, err, func(les LEs) string {
940 "Tossing area %s/%s (%s): %s: removing",
941 sender.Name, pktName,
942 humanize.IBytes(pktSize),
947 } else if ctx.HdrUsage {
948 os.Remove(JobPath2Hdr(jobPath))
956 hsh := Base32Codec.EncodeToString(pkt.Path[:MTHSize])
957 les := append(les, LE{"Type", "ack"}, LE{"Pkt", hsh})
958 logMsg := func(les LEs) string {
959 return fmt.Sprintf("Tossing ack %s/%s: %s", sender.Name, pktName, hsh)
961 ctx.LogD("rx-ack", les, logMsg)
962 pktPath := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), hsh)
963 if _, err := os.Stat(pktPath); err == nil {
965 if err = os.Remove(pktPath); err != nil {
966 ctx.LogE("rx-ack", les, err, func(les LEs) string {
967 return logMsg(les) + ": removing packet"
970 } else if ctx.HdrUsage {
971 os.Remove(JobPath2Hdr(pktPath))
975 ctx.LogD("rx-ack", les, func(les LEs) string {
976 return logMsg(les) + ": already disappeared"
979 if !opts.DryRun && opts.DoSeen {
980 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
983 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
985 if err = DirSync(filepath.Dir(jobPath)); err != nil {
986 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
988 "Tossing file %s/%s (%s): %s: dirsyncing",
989 sender.Name, pktName,
990 humanize.IBytes(pktSize),
991 filepath.Base(jobPath),
999 if err = os.Remove(jobPath); err != nil {
1000 ctx.LogE("rx", les, err, func(les LEs) string {
1001 return logMsg(les) + ": removing job"
1004 } else if ctx.HdrUsage {
1005 os.Remove(JobPath2Hdr(jobPath))
1008 ctx.LogI("rx", les, func(les LEs) string {
1009 return fmt.Sprintf("Got ACK packet from %s of %s", sender.Name, hsh)
1013 err = errors.New("unknown type")
1015 "rx-type-unknown", les, err,
1016 func(les LEs) string {
1018 "Tossing %s/%s (%s)",
1019 sender.Name, pktName, humanize.IBytes(pktSize),
1028 func (ctx *Ctx) Toss(nodeId *NodeId, xx TRxTx, opts *TossOpts) bool {
1029 dirLock, err := ctx.LockDir(nodeId, "toss")
1033 defer ctx.UnlockDir(dirLock)
1035 decompressor, err := zstd.NewReader(nil)
1039 defer decompressor.Close()
1040 for job := range ctx.Jobs(nodeId, xx) {
1041 pktName := filepath.Base(job.Path)
1043 {"Node", job.PktEnc.Sender},
1045 {"Nice", int(job.PktEnc.Nice)},
1047 if job.PktEnc.Nice > opts.Nice {
1048 ctx.LogD("rx-too-nice", les, func(les LEs) string {
1050 "Tossing %s/%s: too nice: %s",
1051 ctx.NodeName(job.PktEnc.Sender), pktName,
1052 NicenessFmt(job.PktEnc.Nice),
1057 fd, err := os.Open(job.Path)
1059 ctx.LogE("rx-open", les, err, func(les LEs) string {
1061 "Tossing %s/%s: opening %s",
1062 ctx.NodeName(job.PktEnc.Sender), pktName, job.Path,
1068 sender := ctx.Neigh[*job.PktEnc.Sender]
1070 err := errors.New("unknown node")
1071 ctx.LogE("rx-open", les, err, func(les LEs) string {
1074 ctx.NodeName(job.PktEnc.Sender), pktName,
1080 errs := make(chan error, 1)
1081 var sharedKey []byte
1083 pipeR, pipeW := io.Pipe()
1092 uint64(pktSizeWithoutEnc(job.Size)),
1098 pipeWB := bufio.NewWriter(pipeW)
1099 sharedKey, _, _, err = PktEncRead(
1102 bufio.NewReaderSize(fd, MTHBlockSize),
1108 pipeW.CloseWithError(err)
1110 if err := pipeWB.Flush(); err != nil {
1111 pipeW.CloseWithError(err)
1121 if err = <-errs; err == JobRepeatProcess {
1122 if _, err = fd.Seek(0, io.SeekStart); err != nil {
1123 ctx.LogE("rx-seek", les, err, func(les LEs) string {
1125 "Tossing %s/%s: can not seek",
1126 ctx.NodeName(job.PktEnc.Sender),
1134 } else if err != nil {
1142 func (ctx *Ctx) AutoToss(nodeId *NodeId, opts *TossOpts) (chan struct{}, chan bool) {
1143 dw, err := ctx.NewDirWatcher(
1144 filepath.Join(ctx.Spool, nodeId.String(), string(TRx)),
1150 finish := make(chan struct{})
1151 badCode := make(chan bool)
1161 bad = !ctx.Toss(nodeId, TRx, opts) || bad
1165 return finish, badCode