2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2022 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
38 xdr "github.com/davecgh/go-xdr/xdr2"
39 "github.com/dustin/go-humanize"
40 "github.com/klauspost/compress/zstd"
41 "golang.org/x/crypto/blake2b"
42 "golang.org/x/crypto/poly1305"
49 func jobPath2Seen(jobPath string) string {
50 return filepath.Join(filepath.Dir(jobPath), SeenDir, filepath.Base(jobPath))
53 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
55 "From: " + fromTo.From,
57 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
63 "Content-Type: text/plain; charset=utf-8",
64 "Content-Transfer-Encoding: base64",
66 base64.StdEncoding.EncodeToString(body),
69 return strings.NewReader(strings.Join(lines, "\n"))
72 func pktSizeWithoutEnc(pktSize int64) int64 {
73 pktSize = pktSize - PktEncOverhead - PktOverhead - PktSizeOverhead
74 pktSizeBlocks := pktSize / (EncBlkSize + poly1305.TagSize)
75 if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
76 pktSize -= poly1305.TagSize
78 pktSize -= pktSizeBlocks * poly1305.TagSize
82 var JobRepeatProcess = errors.New("needs processing repeat")
93 decompressor *zstd.Decoder,
94 dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool,
97 sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
99 _, err := xdr.Unmarshal(pipeR, &pkt)
101 ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
102 return fmt.Sprintf("Tossing %s/%s: unmarshal", sender.Name, pktName)
106 les = append(les, LE{"Size", int64(pktSize)})
107 ctx.LogD("rx", les, func(les LEs) string {
109 "Tossing %s/%s (%s)",
110 sender.Name, pktName,
111 humanize.IBytes(pktSize),
115 case PktTypeExec, PktTypeExecFat:
119 path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
120 handle := string(path[0])
121 args := make([]string, 0, len(path)-1)
122 for _, p := range path[1:] {
123 args = append(args, string(p))
125 argsStr := strings.Join(append([]string{handle}, args...), " ")
126 les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr})
127 cmdline := sender.Exec[handle]
128 if len(cmdline) == 0 {
129 err = errors.New("No handle found")
131 "rx-no-handle", les, err,
132 func(les LEs) string {
134 "Tossing exec %s/%s (%s): %s",
135 sender.Name, pktName,
136 humanize.IBytes(pktSize), argsStr,
142 if pkt.Type == PktTypeExec {
143 if err = decompressor.Reset(pipeR); err != nil {
148 cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...)
151 "NNCP_SELF="+ctx.Self.Id.String(),
152 "NNCP_SENDER="+sender.Id.String(),
153 "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
155 if pkt.Type == PktTypeExec {
156 cmd.Stdin = decompressor
160 output, err := cmd.CombinedOutput()
162 les = append(les, LE{"Output", strings.Split(
163 strings.Trim(string(output), "\n"), "\n"),
165 ctx.LogE("rx-handle", les, err, func(les LEs) string {
167 "Tossing exec %s/%s (%s): %s: handling",
168 sender.Name, pktName,
169 humanize.IBytes(uint64(pktSize)), argsStr,
174 if len(sendmail) > 0 && ctx.NotifyExec != nil {
175 notify := ctx.NotifyExec[sender.Name+"."+handle]
177 notify = ctx.NotifyExec["*."+handle]
182 append(sendmail[1:], notify.To)...,
184 cmd.Stdin = newNotification(notify, fmt.Sprintf(
185 "Exec from %s: %s", sender.Name, argsStr,
187 if err = cmd.Run(); err != nil {
188 ctx.LogE("rx-notify", les, err, func(les LEs) string {
190 "Tossing exec %s/%s (%s): %s: notifying",
191 sender.Name, pktName,
192 humanize.IBytes(pktSize), argsStr,
199 ctx.LogI("rx", les, func(les LEs) string {
201 "Got exec from %s to %s (%s)",
202 sender.Name, argsStr,
203 humanize.IBytes(pktSize),
206 if !dryRun && jobPath != "" {
208 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
211 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
213 if err = DirSync(filepath.Dir(jobPath)); err != nil {
214 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
216 "Tossing file %s/%s (%s): %s: dirsyncing",
217 sender.Name, pktName,
218 humanize.IBytes(pktSize),
219 filepath.Base(jobPath),
226 if err = os.Remove(jobPath); err != nil {
227 ctx.LogE("rx-notify", les, err, func(les LEs) string {
229 "Tossing exec %s/%s (%s): %s: notifying",
230 sender.Name, pktName,
231 humanize.IBytes(pktSize), argsStr,
235 } else if ctx.HdrUsage {
236 os.Remove(JobPath2Hdr(jobPath))
244 dst := string(pkt.Path[:int(pkt.PathLen)])
245 les = append(les, LE{"Type", "file"}, LE{"Dst", dst})
246 if filepath.IsAbs(dst) {
247 err = errors.New("non-relative destination path")
249 "rx-non-rel", les, err,
250 func(les LEs) string {
252 "Tossing file %s/%s (%s): %s",
253 sender.Name, pktName,
254 humanize.IBytes(pktSize), dst,
260 incoming := sender.Incoming
262 err = errors.New("incoming is not allowed")
264 "rx-no-incoming", les, err,
265 func(les LEs) string {
267 "Tossing file %s/%s (%s): %s",
268 sender.Name, pktName,
269 humanize.IBytes(pktSize), dst,
275 dir := filepath.Join(*incoming, path.Dir(dst))
276 if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
277 ctx.LogE("rx-mkdir", les, err, func(les LEs) string {
279 "Tossing file %s/%s (%s): %s: mkdir",
280 sender.Name, pktName,
281 humanize.IBytes(pktSize), dst,
287 tmp, err := TempFile(dir, "file")
289 ctx.LogE("rx-mktemp", les, err, func(les LEs) string {
291 "Tossing file %s/%s (%s): %s: mktemp",
292 sender.Name, pktName,
293 humanize.IBytes(pktSize), dst,
298 les = append(les, LE{"Tmp", tmp.Name()})
299 ctx.LogD("rx-tmp-created", les, func(les LEs) string {
301 "Tossing file %s/%s (%s): %s: created: %s",
302 sender.Name, pktName,
303 humanize.IBytes(pktSize), dst, tmp.Name(),
306 bufW := bufio.NewWriter(tmp)
307 if _, err = CopyProgressed(
308 bufW, pipeR, "Rx file",
309 append(les, LE{"FullSize", int64(pktSize)}),
312 ctx.LogE("rx-copy", les, err, func(les LEs) string {
314 "Tossing file %s/%s (%s): %s: copying",
315 sender.Name, pktName,
316 humanize.IBytes(pktSize), dst,
321 if err = bufW.Flush(); err != nil {
323 ctx.LogE("rx-flush", les, err, func(les LEs) string {
325 "Tossing file %s/%s (%s): %s: flushing",
326 sender.Name, pktName,
327 humanize.IBytes(pktSize), dst,
333 if err = tmp.Sync(); err != nil {
335 ctx.LogE("rx-sync", les, err, func(les LEs) string {
337 "Tossing file %s/%s (%s): %s: syncing",
338 sender.Name, pktName,
339 humanize.IBytes(pktSize), dst,
345 if err = tmp.Close(); err != nil {
346 ctx.LogE("rx-close", les, err, func(les LEs) string {
348 "Tossing file %s/%s (%s): %s: closing",
349 sender.Name, pktName,
350 humanize.IBytes(pktSize), dst,
355 dstPathOrig := filepath.Join(*incoming, dst)
356 dstPath := dstPathOrig
359 if _, err = os.Stat(dstPath); err != nil {
360 if os.IsNotExist(err) {
363 ctx.LogE("rx-stat", les, err, func(les LEs) string {
365 "Tossing file %s/%s (%s): %s: stating: %s",
366 sender.Name, pktName,
367 humanize.IBytes(pktSize), dst, dstPath,
372 dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
375 if err = os.Rename(tmp.Name(), dstPath); err != nil {
376 ctx.LogE("rx-rename", les, err, func(les LEs) string {
378 "Tossing file %s/%s (%s): %s: renaming",
379 sender.Name, pktName,
380 humanize.IBytes(pktSize), dst,
385 if err = DirSync(*incoming); err != nil {
386 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
388 "Tossing file %s/%s (%s): %s: dirsyncing",
389 sender.Name, pktName,
390 humanize.IBytes(pktSize), dst,
395 les = les[:len(les)-1] // delete Tmp
397 ctx.LogI("rx", les, func(les LEs) string {
399 "Got file %s (%s) from %s",
400 dst, humanize.IBytes(pktSize), sender.Name,
406 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
409 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
411 if err = DirSync(filepath.Dir(jobPath)); err != nil {
412 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
414 "Tossing file %s/%s (%s): %s: dirsyncing",
415 sender.Name, pktName,
416 humanize.IBytes(pktSize),
417 filepath.Base(jobPath),
424 if err = os.Remove(jobPath); err != nil {
425 ctx.LogE("rx-remove", les, err, func(les LEs) string {
427 "Tossing file %s/%s (%s): %s: removing",
428 sender.Name, pktName,
429 humanize.IBytes(pktSize), dst,
433 } else if ctx.HdrUsage {
434 os.Remove(JobPath2Hdr(jobPath))
437 if len(sendmail) > 0 && ctx.NotifyFile != nil {
440 append(sendmail[1:], ctx.NotifyFile.To)...,
442 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
443 "File from %s: %s (%s)",
444 sender.Name, dst, humanize.IBytes(pktSize),
446 if err = cmd.Run(); err != nil {
447 ctx.LogE("rx-notify", les, err, func(les LEs) string {
449 "Tossing file %s/%s (%s): %s: notifying",
450 sender.Name, pktName,
451 humanize.IBytes(pktSize), dst,
462 src := string(pkt.Path[:int(pkt.PathLen)])
463 les := append(les, LE{"Type", "freq"}, LE{"Src", src})
464 if filepath.IsAbs(src) {
465 err = errors.New("non-relative source path")
467 "rx-non-rel", les, err,
468 func(les LEs) string {
470 "Tossing freq %s/%s (%s): %s: notifying",
471 sender.Name, pktName,
472 humanize.IBytes(pktSize), src,
478 dstRaw, err := ioutil.ReadAll(pipeR)
480 ctx.LogE("rx-read", les, err, func(les LEs) string {
482 "Tossing freq %s/%s (%s): %s: reading",
483 sender.Name, pktName,
484 humanize.IBytes(pktSize), src,
489 dst := string(dstRaw)
490 les = append(les, LE{"Dst", dst})
491 freqPath := sender.FreqPath
493 err = errors.New("freqing is not allowed")
495 "rx-no-freq", les, err,
496 func(les LEs) string {
498 "Tossing freq %s/%s (%s): %s -> %s",
499 sender.Name, pktName,
500 humanize.IBytes(pktSize), src, dst,
510 filepath.Join(*freqPath, src),
518 ctx.LogE("rx-tx", les, err, func(les LEs) string {
520 "Tossing freq %s/%s (%s): %s -> %s: txing",
521 sender.Name, pktName,
522 humanize.IBytes(pktSize), src, dst,
528 ctx.LogI("rx", les, func(les LEs) string {
529 return fmt.Sprintf("Got file request %s to %s", src, sender.Name)
534 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
537 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
539 if err = DirSync(filepath.Dir(jobPath)); err != nil {
540 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
542 "Tossing file %s/%s (%s): %s: dirsyncing",
543 sender.Name, pktName,
544 humanize.IBytes(pktSize),
545 filepath.Base(jobPath),
552 if err = os.Remove(jobPath); err != nil {
553 ctx.LogE("rx-remove", les, err, func(les LEs) string {
555 "Tossing freq %s/%s (%s): %s -> %s: removing",
556 sender.Name, pktName,
557 humanize.IBytes(pktSize), src, dst,
561 } else if ctx.HdrUsage {
562 os.Remove(JobPath2Hdr(jobPath))
565 if len(sendmail) > 0 && ctx.NotifyFreq != nil {
568 append(sendmail[1:], ctx.NotifyFreq.To)...,
570 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
571 "Freq from %s: %s", sender.Name, src,
573 if err = cmd.Run(); err != nil {
574 ctx.LogE("rx-notify", les, err, func(les LEs) string {
576 "Tossing freq %s/%s (%s): %s -> %s: notifying",
577 sender.Name, pktName,
578 humanize.IBytes(pktSize), src, dst,
589 dst := new([MTHSize]byte)
590 copy(dst[:], pkt.Path[:int(pkt.PathLen)])
591 nodeId := NodeId(*dst)
592 les := append(les, LE{"Type", "trns"}, LE{"Dst", nodeId})
593 logMsg := func(les LEs) string {
595 "Tossing trns %s/%s (%s): %s",
596 sender.Name, pktName,
597 humanize.IBytes(pktSize),
601 node := ctx.Neigh[nodeId]
603 err = errors.New("unknown node")
604 ctx.LogE("rx-unknown", les, err, logMsg)
607 ctx.LogD("rx-tx", les, logMsg)
609 if len(node.Via) == 0 {
610 if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil {
611 ctx.LogE("rx", les, err, func(les LEs) string {
612 return logMsg(les) + ": txing"
617 via := node.Via[:len(node.Via)-1]
618 node = ctx.Neigh[*node.Via[len(node.Via)-1]]
619 node = &Node{Id: node.Id, Via: via, ExchPub: node.ExchPub}
620 pktTrns, err := NewPkt(PktTypeTrns, 0, nodeId[:])
624 if _, _, _, err = ctx.Tx(
628 int64(pktSize), 0, MaxFileSize,
633 ctx.LogE("rx", les, err, func(les LEs) string {
634 return logMsg(les) + ": txing"
640 ctx.LogI("rx", les, func(les LEs) string {
642 "Got transitional packet from %s to %s (%s)",
644 ctx.NodeName(&nodeId),
645 humanize.IBytes(pktSize),
648 if !dryRun && jobPath != "" {
650 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
653 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
655 if err = DirSync(filepath.Dir(jobPath)); err != nil {
656 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
658 "Tossing file %s/%s (%s): %s: dirsyncing",
659 sender.Name, pktName,
660 humanize.IBytes(pktSize),
661 filepath.Base(jobPath),
668 if err = os.Remove(jobPath); err != nil {
669 ctx.LogE("rx", les, err, func(les LEs) string {
671 "Tossing trns %s/%s (%s): %s: removing",
672 sender.Name, pktName,
673 humanize.IBytes(pktSize),
674 ctx.NodeName(&nodeId),
678 } else if ctx.HdrUsage {
679 os.Remove(JobPath2Hdr(jobPath))
687 areaId := new(AreaId)
688 copy(areaId[:], pkt.Path[:int(pkt.PathLen)])
689 les := append(les, LE{"Type", "area"}, LE{"Area", areaId})
690 logMsg := func(les LEs) string {
692 "Tossing %s/%s (%s): area %s",
693 sender.Name, pktName,
694 humanize.IBytes(pktSize),
695 ctx.AreaName(areaId),
698 area := ctx.AreaId2Area[*areaId]
700 err = errors.New("unknown area")
701 ctx.LogE("rx-area-unknown", les, err, logMsg)
704 pktEnc, pktEncRaw, err := ctx.HdrRead(pipeR)
705 fullPipeR := io.MultiReader(bytes.NewReader(pktEncRaw), pipeR)
707 ctx.LogE("rx-area-pkt-enc-read", les, err, logMsg)
710 msgHashRaw := blake2b.Sum256(pktEncRaw)
711 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
712 les = append(les, LE{"AreaMsg", msgHash})
713 ctx.LogD("rx-area", les, logMsg)
716 for _, nodeId := range area.Subs {
717 node := ctx.Neigh[*nodeId]
718 lesEcho := append(les, LE{"Echo", nodeId})
719 seenDir := filepath.Join(
720 ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
722 seenPath := filepath.Join(seenDir, msgHash)
723 logMsgNode := func(les LEs) string {
725 "%s: echoing to: %s", logMsg(les), node.Name,
728 if _, err := os.Stat(seenPath); err == nil {
729 ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
730 return logMsgNode(les) + ": already sent"
734 ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
737 for _, nodeId := range area.Subs {
738 node := ctx.Neigh[*nodeId]
739 lesEcho := append(les, LE{"Echo", nodeId})
740 seenDir := filepath.Join(
741 ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
743 seenPath := filepath.Join(seenDir, msgHash)
744 logMsgNode := func(les LEs) string {
745 return fmt.Sprintf("%s: echo to: %s", logMsg(les), node.Name)
747 if _, err := os.Stat(seenPath); err == nil {
748 ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
749 return logMsgNode(les) + ": already sent"
753 if nodeId != sender.Id && nodeId != pktEnc.Sender {
754 ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
755 if _, _, _, err = ctx.Tx(
759 int64(pktSize), 0, MaxFileSize,
764 ctx.LogE("rx-area", lesEcho, err, logMsgNode)
768 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
769 ctx.LogE("rx-area-mkdir", lesEcho, err, logMsgNode)
772 if fd, err := os.Create(seenPath); err == nil {
774 if err = DirSync(seenDir); err != nil {
775 ctx.LogE("rx-area-dirsync", les, err, logMsgNode)
779 ctx.LogE("rx-area-touch", lesEcho, err, logMsgNode)
782 return JobRepeatProcess
786 seenDir := filepath.Join(
787 ctx.Spool, ctx.SelfId.String(), AreaDir, area.Id.String(),
789 seenPath := filepath.Join(seenDir, msgHash)
790 if _, err := os.Stat(seenPath); err == nil {
791 ctx.LogD("rx-area-seen", les, func(les LEs) string {
792 return logMsg(les) + ": already seen"
794 if !dryRun && jobPath != "" {
795 if err = os.Remove(jobPath); err != nil {
796 ctx.LogE("rx-area-remove", les, err, func(les LEs) string {
798 "Tossing area %s/%s (%s): %s: removing",
799 sender.Name, pktName,
800 humanize.IBytes(pktSize),
805 } else if ctx.HdrUsage {
806 os.Remove(JobPath2Hdr(jobPath))
813 ctx.LogD("rx-area-no-prv", les, func(les LEs) string {
814 return logMsg(les) + ": no private key for decoding"
817 signatureVerify := true
818 if _, senderKnown := ctx.Neigh[*pktEnc.Sender]; !senderKnown {
819 if !area.AllowUnknown {
820 err = errors.New("unknown sender")
823 append(les, LE{"Sender", pktEnc.Sender}),
825 func(les LEs) string {
826 return logMsg(les) + ": sender: " + pktEnc.Sender.String()
831 signatureVerify = false
833 areaNodeOur := NodeOur{Id: new(NodeId), ExchPrv: new([32]byte)}
834 copy(areaNodeOur.Id[:], area.Id[:])
835 copy(areaNodeOur.ExchPrv[:], area.Prv[:])
839 Incoming: area.Incoming,
842 copy(areaNode.Id[:], area.Id[:])
843 pktName := fmt.Sprintf(
845 Base32Codec.EncodeToString(areaId[:]), msgHash,
848 pipeR, pipeW := io.Pipe()
849 errs := make(chan error, 1)
858 uint64(pktSizeWithoutEnc(int64(pktSize))),
861 dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK,
864 _, _, _, err = PktEncRead(
873 ctx.LogE("rx-area-pkt-enc-read2", les, err, logMsg)
874 pipeW.CloseWithError(err)
879 if err = <-errs; err != nil {
884 if !dryRun && jobPath != "" {
885 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
886 ctx.LogE("rx-area-mkdir", les, err, logMsg)
889 if fd, err := os.Create(seenPath); err == nil {
891 if err = DirSync(seenDir); err != nil {
892 ctx.LogE("rx-area-dirsync", les, err, logMsg)
896 if err = os.Remove(jobPath); err != nil {
897 ctx.LogE("rx", les, err, func(les LEs) string {
899 "Tossing area %s/%s (%s): %s: removing",
900 sender.Name, pktName,
901 humanize.IBytes(pktSize),
906 } else if ctx.HdrUsage {
907 os.Remove(JobPath2Hdr(jobPath))
915 hsh := Base32Codec.EncodeToString(pkt.Path[:MTHSize])
916 les := append(les, LE{"Type", "ack"}, LE{"Pkt", hsh})
917 logMsg := func(les LEs) string {
918 return fmt.Sprintf("Tossing ack %s/%s: %s", sender.Name, pktName, hsh)
920 ctx.LogD("rx-ack", les, logMsg)
921 pktPath := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), hsh)
922 if _, err := os.Stat(pktPath); err == nil {
924 if err = os.Remove(pktPath); err != nil {
925 ctx.LogE("rx-ack", les, err, func(les LEs) string {
926 return logMsg(les) + ": removing packet"
929 } else if ctx.HdrUsage {
930 os.Remove(JobPath2Hdr(pktPath))
934 ctx.LogD("rx-ack", les, func(les LEs) string {
935 return logMsg(les) + ": already disappeared"
938 if !dryRun && doSeen {
939 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
942 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
944 if err = DirSync(filepath.Dir(jobPath)); err != nil {
945 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
947 "Tossing file %s/%s (%s): %s: dirsyncing",
948 sender.Name, pktName,
949 humanize.IBytes(pktSize),
950 filepath.Base(jobPath),
958 if err = os.Remove(jobPath); err != nil {
959 ctx.LogE("rx", les, err, func(les LEs) string {
960 return logMsg(les) + ": removing job"
963 } else if ctx.HdrUsage {
964 os.Remove(JobPath2Hdr(jobPath))
967 ctx.LogI("rx", les, func(les LEs) string {
968 return fmt.Sprintf("Got ACK packet from %s of %s", sender.Name, hsh)
972 err = errors.New("unknown type")
974 "rx-type-unknown", les, err,
975 func(les LEs) string {
977 "Tossing %s/%s (%s)",
978 sender.Name, pktName, humanize.IBytes(pktSize),
987 func (ctx *Ctx) Toss(
991 dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool,
993 dirLock, err := ctx.LockDir(nodeId, "toss")
997 defer ctx.UnlockDir(dirLock)
999 decompressor, err := zstd.NewReader(nil)
1003 defer decompressor.Close()
1004 for job := range ctx.Jobs(nodeId, xx) {
1005 pktName := filepath.Base(job.Path)
1007 {"Node", job.PktEnc.Sender},
1009 {"Nice", int(job.PktEnc.Nice)},
1011 if job.PktEnc.Nice > nice {
1012 ctx.LogD("rx-too-nice", les, func(les LEs) string {
1014 "Tossing %s/%s: too nice: %s",
1015 ctx.NodeName(job.PktEnc.Sender), pktName,
1016 NicenessFmt(job.PktEnc.Nice),
1021 fd, err := os.Open(job.Path)
1023 ctx.LogE("rx-open", les, err, func(les LEs) string {
1025 "Tossing %s/%s: opening %s",
1026 ctx.NodeName(job.PktEnc.Sender), pktName, job.Path,
1032 sender := ctx.Neigh[*job.PktEnc.Sender]
1034 err := errors.New("unknown node")
1035 ctx.LogE("rx-open", les, err, func(les LEs) string {
1038 ctx.NodeName(job.PktEnc.Sender), pktName,
1044 errs := make(chan error, 1)
1045 var sharedKey []byte
1047 pipeR, pipeW := io.Pipe()
1056 uint64(pktSizeWithoutEnc(job.Size)),
1059 dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK,
1062 pipeWB := bufio.NewWriter(pipeW)
1063 sharedKey, _, _, err = PktEncRead(
1066 bufio.NewReaderSize(fd, MTHBlockSize),
1072 pipeW.CloseWithError(err)
1074 if err := pipeWB.Flush(); err != nil {
1075 pipeW.CloseWithError(err)
1085 if err = <-errs; err == JobRepeatProcess {
1086 if _, err = fd.Seek(0, io.SeekStart); err != nil {
1087 ctx.LogE("rx-seek", les, err, func(les LEs) string {
1089 "Tossing %s/%s: can not seek",
1090 ctx.NodeName(job.PktEnc.Sender),
1098 } else if err != nil {
1106 func (ctx *Ctx) AutoToss(
1109 doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool,
1110 ) (chan struct{}, chan bool) {
1111 dw, err := ctx.NewDirWatcher(
1112 filepath.Join(ctx.Spool, nodeId.String(), string(TRx)),
1118 finish := make(chan struct{})
1119 badCode := make(chan bool)
1130 nodeId, TRx, nice, false,
1131 doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK) || bad
1135 return finish, badCode