1 // NNCP -- Node to Node copy, utilities for store-and-forward data exchange
2 // Copyright (C) 2016-2024 Sergey Matveev <stargrave@stargrave.org>
4 // This program is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation, version 3 of the License.
8 // This program is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 // GNU General Public License for more details.
13 // You should have received a copy of the GNU General Public License
14 // along with this program. If not, see <http://www.gnu.org/licenses/>.
36 xdr "github.com/davecgh/go-xdr/xdr2"
37 "github.com/dustin/go-humanize"
38 "github.com/klauspost/compress/zstd"
39 "golang.org/x/crypto/blake2b"
40 "golang.org/x/crypto/poly1305"
48 type TossOpts struct {
61 func jobPath2Seen(jobPath string) string {
62 return filepath.Join(filepath.Dir(jobPath), SeenDir, filepath.Base(jobPath))
65 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
67 "From: " + fromTo.From,
69 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
75 "Content-Type: text/plain; charset=utf-8",
76 "Content-Transfer-Encoding: base64",
78 base64.StdEncoding.EncodeToString(body),
81 return strings.NewReader(strings.Join(lines, "\n"))
84 func pktSizeWithoutEnc(pktSize int64) int64 {
85 pktSize = pktSize - PktEncOverhead - PktOverhead - PktSizeOverhead
86 pktSizeBlocks := pktSize / (EncBlkSize + poly1305.TagSize)
87 if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
88 pktSize -= poly1305.TagSize
90 pktSize -= pktSizeBlocks * poly1305.TagSize
94 var JobRepeatProcess = errors.New("needs processing repeat")
105 decompressor *zstd.Decoder,
109 sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
111 _, err := xdr.Unmarshal(pipeR, &pkt)
113 ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
114 return fmt.Sprintf("Tossing %s/%s: unmarshal", sender.Name, pktName)
118 les = append(les, LE{"Size", int64(pktSize)})
119 ctx.LogD("rx", les, func(les LEs) string {
121 "Tossing %s/%s (%s)",
122 sender.Name, pktName,
123 humanize.IBytes(pktSize),
126 if opts.GenACK && pkt.Type != PktTypeACK {
127 newPktName, err := ctx.TxACK(
128 sender, sender.ACKNice, pktName, sender.ACKMinSize,
131 ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
132 return fmt.Sprintf("Tossing %s/%s: generating ACK", sender.Name, pktName)
136 ackDir := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), ACKDir)
137 os.MkdirAll(ackDir, os.FileMode(0777))
138 if fd, err := os.Create(filepath.Join(ackDir, newPktName)); err == nil {
140 if err = DirSync(ackDir); err != nil {
141 ctx.LogE("rx-genack", les, err, func(les LEs) string {
142 return fmt.Sprintf("Tossing %s/%s: genACK", sender.Name, pktName)
147 ctx.LogE("rx-genack", les, err, func(les LEs) string {
148 return fmt.Sprintf("Tossing %s/%s: genACK", sender.Name, pktName)
154 case PktTypeExec, PktTypeExecFat:
158 path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
159 handle := string(path[0])
160 args := make([]string, 0, len(path)-1)
161 for _, p := range path[1:] {
162 args = append(args, string(p))
164 argsStr := strings.Join(append([]string{handle}, args...), " ")
165 les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr})
166 cmdline := sender.Exec[handle]
167 if len(cmdline) == 0 {
168 err = errors.New("No handle found")
170 "rx-no-handle", les, err,
171 func(les LEs) string {
173 "Tossing exec %s/%s (%s): %s",
174 sender.Name, pktName,
175 humanize.IBytes(pktSize), argsStr,
181 if pkt.Type == PktTypeExec {
182 if err = decompressor.Reset(pipeR); err != nil {
187 cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...)
190 "NNCP_SELF="+ctx.Self.Id.String(),
191 "NNCP_SENDER="+sender.Id.String(),
192 "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
194 if pkt.Type == PktTypeExec {
195 cmd.Stdin = decompressor
199 output, err := cmd.CombinedOutput()
201 les = append(les, LE{"Output", strings.Split(
202 strings.Trim(string(output), "\n"), "\n"),
204 ctx.LogE("rx-handle", les, err, func(les LEs) string {
206 "Tossing exec %s/%s (%s): %s: handling",
207 sender.Name, pktName,
208 humanize.IBytes(uint64(pktSize)), argsStr,
213 if len(sendmail) > 0 && ctx.NotifyExec != nil {
214 notify := ctx.NotifyExec[sender.Name+"."+handle]
216 notify = ctx.NotifyExec["*."+handle]
221 append(sendmail[1:], notify.To)...,
223 cmd.Stdin = newNotification(notify, fmt.Sprintf(
224 "Exec from %s: %s", sender.Name, argsStr,
226 if err = cmd.Run(); err != nil {
227 ctx.LogE("rx-notify", les, err, func(les LEs) string {
229 "Tossing exec %s/%s (%s): %s: notifying",
230 sender.Name, pktName,
231 humanize.IBytes(pktSize), argsStr,
238 ctx.LogI("rx", les, func(les LEs) string {
240 "Got exec from %s to %s (%s)",
241 sender.Name, argsStr,
242 humanize.IBytes(pktSize),
245 if !opts.DryRun && jobPath != "" {
247 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
250 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
252 if err = DirSync(filepath.Dir(jobPath)); err != nil {
253 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
255 "Tossing file %s/%s (%s): %s: dirsyncing",
256 sender.Name, pktName,
257 humanize.IBytes(pktSize),
258 filepath.Base(jobPath),
265 if err = os.Remove(jobPath); err != nil {
266 ctx.LogE("rx-notify", les, err, func(les LEs) string {
268 "Tossing exec %s/%s (%s): %s: notifying",
269 sender.Name, pktName,
270 humanize.IBytes(pktSize), argsStr,
274 } else if ctx.HdrUsage {
275 os.Remove(JobPath2Hdr(jobPath))
283 dst := string(pkt.Path[:int(pkt.PathLen)])
284 les = append(les, LE{"Type", "file"}, LE{"Dst", dst})
285 if filepath.IsAbs(dst) {
286 err = errors.New("non-relative destination path")
288 "rx-non-rel", les, err,
289 func(les LEs) string {
291 "Tossing file %s/%s (%s): %s",
292 sender.Name, pktName,
293 humanize.IBytes(pktSize), dst,
299 incoming := sender.Incoming
301 err = errors.New("incoming is not allowed")
303 "rx-no-incoming", les, err,
304 func(les LEs) string {
306 "Tossing file %s/%s (%s): %s",
307 sender.Name, pktName,
308 humanize.IBytes(pktSize), dst,
314 dir := filepath.Join(*incoming, path.Dir(dst))
315 if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
316 ctx.LogE("rx-mkdir", les, err, func(les LEs) string {
318 "Tossing file %s/%s (%s): %s: mkdir",
319 sender.Name, pktName,
320 humanize.IBytes(pktSize), dst,
326 tmp, err := TempFile(dir, "file")
328 ctx.LogE("rx-mktemp", les, err, func(les LEs) string {
330 "Tossing file %s/%s (%s): %s: mktemp",
331 sender.Name, pktName,
332 humanize.IBytes(pktSize), dst,
337 les = append(les, LE{"Tmp", tmp.Name()})
338 ctx.LogD("rx-tmp-created", les, func(les LEs) string {
340 "Tossing file %s/%s (%s): %s: created: %s",
341 sender.Name, pktName,
342 humanize.IBytes(pktSize), dst, tmp.Name(),
345 bufW := bufio.NewWriter(tmp)
346 if _, err = CopyProgressed(
347 bufW, pipeR, "Rx file",
348 append(les, LE{"FullSize", int64(pktSize)}),
351 ctx.LogE("rx-copy", les, err, func(les LEs) string {
353 "Tossing file %s/%s (%s): %s: copying",
354 sender.Name, pktName,
355 humanize.IBytes(pktSize), dst,
360 if err = bufW.Flush(); err != nil {
362 ctx.LogE("rx-flush", les, err, func(les LEs) string {
364 "Tossing file %s/%s (%s): %s: flushing",
365 sender.Name, pktName,
366 humanize.IBytes(pktSize), dst,
372 if err = tmp.Sync(); err != nil {
374 ctx.LogE("rx-sync", les, err, func(les LEs) string {
376 "Tossing file %s/%s (%s): %s: syncing",
377 sender.Name, pktName,
378 humanize.IBytes(pktSize), dst,
384 if err = tmp.Close(); err != nil {
385 ctx.LogE("rx-close", les, err, func(les LEs) string {
387 "Tossing file %s/%s (%s): %s: closing",
388 sender.Name, pktName,
389 humanize.IBytes(pktSize), dst,
394 dstPathOrig := filepath.Join(*incoming, dst)
395 dstPath := dstPathOrig
398 if _, err = os.Stat(dstPath); err != nil {
399 if errors.Is(err, fs.ErrNotExist) {
402 ctx.LogE("rx-stat", les, err, func(les LEs) string {
404 "Tossing file %s/%s (%s): %s: stating: %s",
405 sender.Name, pktName,
406 humanize.IBytes(pktSize), dst, dstPath,
411 dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
414 if err = os.Rename(tmp.Name(), dstPath); err != nil {
415 ctx.LogE("rx-rename", les, err, func(les LEs) string {
417 "Tossing file %s/%s (%s): %s: renaming",
418 sender.Name, pktName,
419 humanize.IBytes(pktSize), dst,
424 if err = DirSync(*incoming); err != nil {
425 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
427 "Tossing file %s/%s (%s): %s: dirsyncing",
428 sender.Name, pktName,
429 humanize.IBytes(pktSize), dst,
434 les = les[:len(les)-1] // delete Tmp
436 ctx.LogI("rx", les, func(les LEs) string {
438 "Got file %s (%s) from %s",
439 dst, humanize.IBytes(pktSize), sender.Name,
445 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
448 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
450 if err = DirSync(filepath.Dir(jobPath)); err != nil {
451 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
453 "Tossing file %s/%s (%s): %s: dirsyncing",
454 sender.Name, pktName,
455 humanize.IBytes(pktSize),
456 filepath.Base(jobPath),
463 if err = os.Remove(jobPath); err != nil {
464 ctx.LogE("rx-remove", les, err, func(les LEs) string {
466 "Tossing file %s/%s (%s): %s: removing",
467 sender.Name, pktName,
468 humanize.IBytes(pktSize), dst,
472 } else if ctx.HdrUsage {
473 os.Remove(JobPath2Hdr(jobPath))
476 if len(sendmail) > 0 && ctx.NotifyFile != nil {
479 append(sendmail[1:], ctx.NotifyFile.To)...,
481 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
482 "File from %s: %s (%s)",
483 sender.Name, dst, humanize.IBytes(pktSize),
485 if err = cmd.Run(); err != nil {
486 ctx.LogE("rx-notify", les, err, func(les LEs) string {
488 "Tossing file %s/%s (%s): %s: notifying",
489 sender.Name, pktName,
490 humanize.IBytes(pktSize), dst,
501 src := string(pkt.Path[:int(pkt.PathLen)])
502 les := append(les, LE{"Type", "freq"}, LE{"Src", src})
503 if filepath.IsAbs(src) {
504 err = errors.New("non-relative source path")
506 "rx-non-rel", les, err,
507 func(les LEs) string {
509 "Tossing freq %s/%s (%s): %s: notifying",
510 sender.Name, pktName,
511 humanize.IBytes(pktSize), src,
517 dstRaw, err := io.ReadAll(pipeR)
519 ctx.LogE("rx-read", les, err, func(les LEs) string {
521 "Tossing freq %s/%s (%s): %s: reading",
522 sender.Name, pktName,
523 humanize.IBytes(pktSize), src,
528 dst := string(dstRaw)
529 les = append(les, LE{"Dst", dst})
530 freqPath := sender.FreqPath
532 err = errors.New("freqing is not allowed")
534 "rx-no-freq", les, err,
535 func(les LEs) string {
537 "Tossing freq %s/%s (%s): %s -> %s",
538 sender.Name, pktName,
539 humanize.IBytes(pktSize), src, dst,
549 filepath.Join(*freqPath, src),
557 ctx.LogE("rx-tx", les, err, func(les LEs) string {
559 "Tossing freq %s/%s (%s): %s -> %s: txing",
560 sender.Name, pktName,
561 humanize.IBytes(pktSize), src, dst,
567 ctx.LogI("rx", les, func(les LEs) string {
568 return fmt.Sprintf("Got file request %s to %s", src, sender.Name)
573 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
576 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
578 if err = DirSync(filepath.Dir(jobPath)); err != nil {
579 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
581 "Tossing file %s/%s (%s): %s: dirsyncing",
582 sender.Name, pktName,
583 humanize.IBytes(pktSize),
584 filepath.Base(jobPath),
591 if err = os.Remove(jobPath); err != nil {
592 ctx.LogE("rx-remove", les, err, func(les LEs) string {
594 "Tossing freq %s/%s (%s): %s -> %s: removing",
595 sender.Name, pktName,
596 humanize.IBytes(pktSize), src, dst,
600 } else if ctx.HdrUsage {
601 os.Remove(JobPath2Hdr(jobPath))
604 if len(sendmail) > 0 && ctx.NotifyFreq != nil {
607 append(sendmail[1:], ctx.NotifyFreq.To)...,
609 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
610 "Freq from %s: %s", sender.Name, src,
612 if err = cmd.Run(); err != nil {
613 ctx.LogE("rx-notify", les, err, func(les LEs) string {
615 "Tossing freq %s/%s (%s): %s -> %s: notifying",
616 sender.Name, pktName,
617 humanize.IBytes(pktSize), src, dst,
628 dst := new([MTHSize]byte)
629 copy(dst[:], pkt.Path[:int(pkt.PathLen)])
630 nodeId := NodeId(*dst)
631 les := append(les, LE{"Type", "trns"}, LE{"Dst", nodeId})
632 logMsg := func(les LEs) string {
634 "Tossing trns %s/%s (%s): %s",
635 sender.Name, pktName,
636 humanize.IBytes(pktSize),
640 node := ctx.Neigh[nodeId]
642 err = errors.New("unknown node")
643 ctx.LogE("rx-unknown", les, err, logMsg)
646 ctx.LogD("rx-tx", les, logMsg)
648 if len(node.Via) == 0 {
649 if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil {
650 ctx.LogE("rx", les, err, func(les LEs) string {
651 return logMsg(les) + ": txing"
656 via := node.Via[:len(node.Via)-1]
657 node = ctx.Neigh[*node.Via[len(node.Via)-1]]
658 node = &Node{Id: node.Id, Via: via, ExchPub: node.ExchPub}
659 pktTrns, err := NewPkt(PktTypeTrns, 0, nodeId[:])
663 if _, _, _, err = ctx.Tx(
667 int64(pktSize), 0, MaxFileSize,
672 ctx.LogE("rx", les, err, func(les LEs) string {
673 return logMsg(les) + ": txing"
679 ctx.LogI("rx", les, func(les LEs) string {
681 "Got transitional packet from %s to %s (%s)",
683 ctx.NodeName(&nodeId),
684 humanize.IBytes(pktSize),
687 if !opts.DryRun && jobPath != "" {
689 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
692 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
694 if err = DirSync(filepath.Dir(jobPath)); err != nil {
695 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
697 "Tossing file %s/%s (%s): %s: dirsyncing",
698 sender.Name, pktName,
699 humanize.IBytes(pktSize),
700 filepath.Base(jobPath),
707 if err = os.Remove(jobPath); err != nil {
708 ctx.LogE("rx", les, err, func(les LEs) string {
710 "Tossing trns %s/%s (%s): %s: removing",
711 sender.Name, pktName,
712 humanize.IBytes(pktSize),
713 ctx.NodeName(&nodeId),
717 } else if ctx.HdrUsage {
718 os.Remove(JobPath2Hdr(jobPath))
726 areaId := new(AreaId)
727 copy(areaId[:], pkt.Path[:int(pkt.PathLen)])
728 les := append(les, LE{"Type", "area"}, LE{"Area", areaId})
729 logMsg := func(les LEs) string {
731 "Tossing %s/%s (%s): area %s",
732 sender.Name, pktName,
733 humanize.IBytes(pktSize),
734 ctx.AreaName(areaId),
737 area := ctx.AreaId2Area[*areaId]
739 err = errors.New("unknown area")
740 ctx.LogE("rx-area-unknown", les, err, logMsg)
743 pktEnc, pktEncRaw, err := ctx.HdrRead(pipeR)
744 fullPipeR := io.MultiReader(bytes.NewReader(pktEncRaw), pipeR)
746 ctx.LogE("rx-area-pkt-enc-read", les, err, logMsg)
749 msgHashRaw := blake2b.Sum256(pktEncRaw)
750 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
751 les = append(les, LE{"AreaMsg", msgHash})
752 ctx.LogD("rx-area", les, logMsg)
755 for _, nodeId := range area.Subs {
756 node := ctx.Neigh[*nodeId]
757 lesEcho := append(les, LE{"Echo", nodeId})
758 seenDir := filepath.Join(
759 ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
761 seenPath := filepath.Join(seenDir, msgHash)
762 logMsgNode := func(les LEs) string {
764 "%s: echoing to: %s", logMsg(les), node.Name,
767 if _, err := os.Stat(seenPath); err == nil {
768 ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
769 return logMsgNode(les) + ": already sent"
773 ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
776 for _, nodeId := range area.Subs {
777 node := ctx.Neigh[*nodeId]
778 lesEcho := append(les, LE{"Echo", nodeId})
779 seenDir := filepath.Join(
780 ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
782 seenPath := filepath.Join(seenDir, msgHash)
783 logMsgNode := func(les LEs) string {
784 return fmt.Sprintf("%s: echo to: %s", logMsg(les), node.Name)
786 if _, err := os.Stat(seenPath); err == nil {
787 ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
788 return logMsgNode(les) + ": already sent"
792 if nodeId != sender.Id && nodeId != pktEnc.Sender {
793 ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
794 if _, _, _, err = ctx.Tx(
798 int64(pktSize), 0, MaxFileSize,
803 ctx.LogE("rx-area", lesEcho, err, logMsgNode)
807 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
808 ctx.LogE("rx-area-mkdir", lesEcho, err, logMsgNode)
811 if fd, err := os.Create(seenPath); err == nil {
813 if err = DirSync(seenDir); err != nil {
814 ctx.LogE("rx-area-dirsync", les, err, logMsgNode)
818 ctx.LogE("rx-area-touch", lesEcho, err, logMsgNode)
821 return JobRepeatProcess
825 seenDir := filepath.Join(
826 ctx.Spool, ctx.SelfId.String(), AreaDir, area.Id.String(),
828 seenPath := filepath.Join(seenDir, msgHash)
829 if _, err := os.Stat(seenPath); err == nil {
830 ctx.LogD("rx-area-seen", les, func(les LEs) string {
831 return logMsg(les) + ": already seen"
833 if !opts.DryRun && jobPath != "" {
834 if err = os.Remove(jobPath); err != nil {
835 ctx.LogE("rx-area-remove", les, err, func(les LEs) string {
837 "Tossing area %s/%s (%s): %s: removing",
838 sender.Name, pktName,
839 humanize.IBytes(pktSize),
844 } else if ctx.HdrUsage {
845 os.Remove(JobPath2Hdr(jobPath))
852 ctx.LogD("rx-area-no-prv", les, func(les LEs) string {
853 return logMsg(les) + ": no private key for decoding"
856 signatureVerify := true
857 if _, senderKnown := ctx.Neigh[*pktEnc.Sender]; !senderKnown {
858 if !area.AllowUnknown {
859 err = errors.New("unknown sender")
862 append(les, LE{"Sender", pktEnc.Sender}),
864 func(les LEs) string {
865 return logMsg(les) + ": sender: " + pktEnc.Sender.String()
870 signatureVerify = false
872 areaNodeOur := NodeOur{Id: new(NodeId), ExchPrv: new([32]byte)}
873 copy(areaNodeOur.Id[:], area.Id[:])
874 copy(areaNodeOur.ExchPrv[:], area.Prv[:])
878 Incoming: area.Incoming,
881 copy(areaNode.Id[:], area.Id[:])
882 pktName := fmt.Sprintf(
884 Base32Codec.EncodeToString(areaId[:]), msgHash,
887 pipeR, pipeW := io.Pipe()
888 errs := make(chan error, 1)
897 uint64(pktSizeWithoutEnc(int64(pktSize))),
903 _, _, _, err = PktEncRead(
912 ctx.LogE("rx-area-pkt-enc-read2", les, err, logMsg)
913 pipeW.CloseWithError(err)
918 if err = <-errs; err != nil {
923 if !opts.DryRun && jobPath != "" {
924 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
925 ctx.LogE("rx-area-mkdir", les, err, logMsg)
928 if fd, err := os.Create(seenPath); err == nil {
930 if err = DirSync(seenDir); err != nil {
931 ctx.LogE("rx-area-dirsync", les, err, logMsg)
935 if err = os.Remove(jobPath); err != nil {
936 ctx.LogE("rx", les, err, func(les LEs) string {
938 "Tossing area %s/%s (%s): %s: removing",
939 sender.Name, pktName,
940 humanize.IBytes(pktSize),
945 } else if ctx.HdrUsage {
946 os.Remove(JobPath2Hdr(jobPath))
954 hsh := Base32Codec.EncodeToString(pkt.Path[:MTHSize])
955 les := append(les, LE{"Type", "ack"}, LE{"Pkt", hsh})
956 logMsg := func(les LEs) string {
957 return fmt.Sprintf("Tossing ack %s/%s: %s", sender.Name, pktName, hsh)
959 ctx.LogD("rx-ack", les, logMsg)
960 pktPath := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), hsh)
961 if _, err := os.Stat(pktPath); err == nil {
963 if err = os.Remove(pktPath); err != nil {
964 ctx.LogE("rx-ack", les, err, func(les LEs) string {
965 return logMsg(les) + ": removing packet"
968 } else if ctx.HdrUsage {
969 os.Remove(JobPath2Hdr(pktPath))
973 ctx.LogD("rx-ack", les, func(les LEs) string {
974 return logMsg(les) + ": already disappeared"
977 if !opts.DryRun && opts.DoSeen {
978 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
981 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
983 if err = DirSync(filepath.Dir(jobPath)); err != nil {
984 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
986 "Tossing file %s/%s (%s): %s: dirsyncing",
987 sender.Name, pktName,
988 humanize.IBytes(pktSize),
989 filepath.Base(jobPath),
997 if err = os.Remove(jobPath); err != nil {
998 ctx.LogE("rx", les, err, func(les LEs) string {
999 return logMsg(les) + ": removing job"
1002 } else if ctx.HdrUsage {
1003 os.Remove(JobPath2Hdr(jobPath))
1006 ctx.LogI("rx", les, func(les LEs) string {
1007 return fmt.Sprintf("Got ACK packet from %s of %s", sender.Name, hsh)
1011 err = errors.New("unknown type")
1013 "rx-type-unknown", les, err,
1014 func(les LEs) string {
1016 "Tossing %s/%s (%s)",
1017 sender.Name, pktName, humanize.IBytes(pktSize),
1026 func (ctx *Ctx) Toss(nodeId *NodeId, xx TRxTx, opts *TossOpts) bool {
1027 dirLock, err := ctx.LockDir(nodeId, "toss")
1031 defer ctx.UnlockDir(dirLock)
1033 decompressor, err := zstd.NewReader(nil)
1037 defer decompressor.Close()
1038 for job := range ctx.Jobs(nodeId, xx) {
1039 pktName := filepath.Base(job.Path)
1041 {"Node", job.PktEnc.Sender},
1043 {"Nice", int(job.PktEnc.Nice)},
1045 if job.PktEnc.Nice > opts.Nice {
1046 ctx.LogD("rx-too-nice", les, func(les LEs) string {
1048 "Tossing %s/%s: too nice: %s",
1049 ctx.NodeName(job.PktEnc.Sender), pktName,
1050 NicenessFmt(job.PktEnc.Nice),
1055 fd, err := os.Open(job.Path)
1057 ctx.LogE("rx-open", les, err, func(les LEs) string {
1059 "Tossing %s/%s: opening %s",
1060 ctx.NodeName(job.PktEnc.Sender), pktName, job.Path,
1066 sender := ctx.Neigh[*job.PktEnc.Sender]
1068 err := errors.New("unknown node")
1069 ctx.LogE("rx-open", les, err, func(les LEs) string {
1072 ctx.NodeName(job.PktEnc.Sender), pktName,
1078 errs := make(chan error, 1)
1079 var sharedKey []byte
1081 pipeR, pipeW := io.Pipe()
1090 uint64(pktSizeWithoutEnc(job.Size)),
1096 pipeWB := bufio.NewWriter(pipeW)
1097 sharedKey, _, _, err = PktEncRead(
1100 bufio.NewReaderSize(fd, MTHBlockSize),
1106 pipeW.CloseWithError(err)
1108 if err := pipeWB.Flush(); err != nil {
1109 pipeW.CloseWithError(err)
1119 if err = <-errs; err == JobRepeatProcess {
1120 if _, err = fd.Seek(0, io.SeekStart); err != nil {
1121 ctx.LogE("rx-seek", les, err, func(les LEs) string {
1123 "Tossing %s/%s: can not seek",
1124 ctx.NodeName(job.PktEnc.Sender),
1132 } else if err != nil {
1140 func (ctx *Ctx) AutoToss(nodeId *NodeId, opts *TossOpts) (chan struct{}, chan bool) {
1141 dw, err := ctx.NewDirWatcher(
1142 filepath.Join(ctx.Spool, nodeId.String(), string(TRx)),
1148 finish := make(chan struct{})
1149 badCode := make(chan bool)
1159 bad = !ctx.Toss(nodeId, TRx, opts) || bad
1163 return finish, badCode