2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2023 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
30 xdr "github.com/davecgh/go-xdr/xdr2"
31 "github.com/dustin/go-humanize"
47 HshValue *[MTHSize]byte
50 func JobPath2Hdr(jobPath string) string {
51 return filepath.Join(filepath.Dir(jobPath), HdrDir, filepath.Base(jobPath))
54 func (ctx *Ctx) HdrRead(r io.Reader) (*PktEnc, []byte, error) {
56 _, err := xdr.Unmarshal(r, &pktEnc)
61 if _, err = xdr.Marshal(&raw, pktEnc); err != nil {
64 return &pktEnc, raw.Bytes(), nil
67 func (ctx *Ctx) HdrWrite(pktEncRaw []byte, tgt string) error {
68 tmpHdr, err := ctx.NewTmpFile()
70 ctx.LogE("hdr-write-tmp-new", nil, err, func(les LEs) string {
71 return "Header writing: new temporary file"
75 if _, err = tmpHdr.Write(pktEncRaw); err != nil {
76 ctx.LogE("hdr-write-write", nil, err, func(les LEs) string {
77 return "Header writing: writing"
79 os.Remove(tmpHdr.Name())
82 if err = tmpHdr.Close(); err != nil {
83 ctx.LogE("hdr-write-close", nil, err, func(les LEs) string {
84 return "Header writing: closing"
86 os.Remove(tmpHdr.Name())
89 if err = ensureDir(filepath.Dir(tgt), HdrDir); err != nil {
90 ctx.LogE("hdr-write-ensure-mkdir", nil, err, func(les LEs) string {
91 return "Header writing: ensuring directory"
95 if err = os.Rename(tmpHdr.Name(), JobPath2Hdr(tgt)); err != nil {
96 ctx.LogE("hdr-write-rename", nil, err, func(les LEs) string {
97 return "Header writing: renaming"
104 func (ctx *Ctx) jobsFind(nodeId *NodeId, xx TRxTx, nock, part bool) chan Job {
105 rxPath := filepath.Join(ctx.Spool, nodeId.String(), string(xx))
106 jobs := make(chan Job, 16)
109 dir, err := os.Open(rxPath)
113 fis, err := dir.Readdir(0)
118 for _, fi := range fis {
122 if !strings.HasSuffix(name, NoCKSuffix) ||
123 len(name) != Base32Encoded32Len+len(NoCKSuffix) {
126 hshValue, err = Base32Codec.DecodeString(
127 strings.TrimSuffix(name, NoCKSuffix),
130 if !strings.HasSuffix(name, PartSuffix) ||
131 len(name) != Base32Encoded32Len+len(PartSuffix) {
134 hshValue, err = Base32Codec.DecodeString(
135 strings.TrimSuffix(name, PartSuffix),
138 if len(name) != Base32Encoded32Len {
141 hshValue, err = Base32Codec.DecodeString(name)
146 pth := filepath.Join(rxPath, name)
150 fd, err = os.Open(pth)
152 fd, err = os.Open(JobPath2Hdr(pth))
153 if err != nil && errors.Is(err, fs.ErrNotExist) {
155 fd, err = os.Open(pth)
165 HshValue: new([MTHSize]byte),
167 copy(job.HshValue[:], hshValue)
171 pktEnc, pktEncRaw, err := ctx.HdrRead(fd)
176 switch pktEnc.Magic {
178 err = MagicNNCPEv1.TooOld()
180 err = MagicNNCPEv2.TooOld()
182 err = MagicNNCPEv3.TooOld()
184 err = MagicNNCPEv4.TooOld()
186 err = MagicNNCPEv5.TooOld()
196 }, err, func(les LEs) string {
198 "Job %s/%s size: %s",
200 humanize.IBytes(uint64(fi.Size())),
207 {"Node", pktEnc.Sender},
209 {"Nice", int(pktEnc.Nice)},
211 }, func(les LEs) string {
213 "Job %s/%s/%s nice: %s size: %s",
214 pktEnc.Sender, string(xx), name,
215 NicenessFmt(pktEnc.Nice),
216 humanize.IBytes(uint64(fi.Size())),
219 if !hdrExists && ctx.HdrUsage {
220 ctx.HdrWrite(pktEncRaw, pth)
226 HshValue: new([MTHSize]byte),
228 copy(job.HshValue[:], hshValue)
235 func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job {
236 return ctx.jobsFind(nodeId, xx, false, false)
239 func (ctx *Ctx) JobsNoCK(nodeId *NodeId) chan Job {
240 return ctx.jobsFind(nodeId, TRx, true, false)
243 func (ctx *Ctx) JobsPart(nodeId *NodeId) chan Job {
244 return ctx.jobsFind(nodeId, TRx, false, true)