]> Cypherpunks.ru repositories - nncp.git/blob - src/jobs.go
Raise copyright years
[nncp.git] / src / jobs.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2022 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "fmt"
23         "io"
24         "os"
25         "path/filepath"
26         "strings"
27
28         xdr "github.com/davecgh/go-xdr/xdr2"
29         "github.com/dustin/go-humanize"
30 )
31
32 type TRxTx string
33
34 const (
35         TRx TRxTx = "rx"
36         TTx TRxTx = "tx"
37
38         HdrDir = "hdr"
39 )
40
41 type Job struct {
42         PktEnc   *PktEnc
43         Path     string
44         Size     int64
45         HshValue *[MTHSize]byte
46 }
47
48 func JobPath2Hdr(jobPath string) string {
49         return filepath.Join(filepath.Dir(jobPath), HdrDir, filepath.Base(jobPath))
50 }
51
52 func (ctx *Ctx) HdrRead(r io.Reader) (*PktEnc, []byte, error) {
53         var pktEnc PktEnc
54         _, err := xdr.Unmarshal(r, &pktEnc)
55         if err != nil {
56                 return nil, nil, err
57         }
58         var raw bytes.Buffer
59         if _, err = xdr.Marshal(&raw, pktEnc); err != nil {
60                 panic(err)
61         }
62         return &pktEnc, raw.Bytes(), nil
63 }
64
65 func (ctx *Ctx) HdrWrite(pktEncRaw []byte, tgt string) error {
66         tmpHdr, err := ctx.NewTmpFile()
67         if err != nil {
68                 ctx.LogE("hdr-write-tmp-new", nil, err, func(les LEs) string {
69                         return "Header writing: new temporary file"
70                 })
71                 return err
72         }
73         if _, err = tmpHdr.Write(pktEncRaw); err != nil {
74                 ctx.LogE("hdr-write-write", nil, err, func(les LEs) string {
75                         return "Header writing: writing"
76                 })
77                 os.Remove(tmpHdr.Name())
78                 return err
79         }
80         if err = tmpHdr.Close(); err != nil {
81                 ctx.LogE("hdr-write-close", nil, err, func(les LEs) string {
82                         return "Header writing: closing"
83                 })
84                 os.Remove(tmpHdr.Name())
85                 return err
86         }
87         if err = ensureDir(filepath.Dir(tgt), HdrDir); err != nil {
88                 ctx.LogE("hdr-write-ensure-mkdir", nil, err, func(les LEs) string {
89                         return "Header writing: ensuring directory"
90                 })
91                 return err
92         }
93         if err = os.Rename(tmpHdr.Name(), JobPath2Hdr(tgt)); err != nil {
94                 ctx.LogE("hdr-write-rename", nil, err, func(les LEs) string {
95                         return "Header writing: renaming"
96                 })
97                 return err
98         }
99         return err
100 }
101
102 func (ctx *Ctx) jobsFind(nodeId *NodeId, xx TRxTx, nock, part bool) chan Job {
103         rxPath := filepath.Join(ctx.Spool, nodeId.String(), string(xx))
104         jobs := make(chan Job, 16)
105         go func() {
106                 defer close(jobs)
107                 dir, err := os.Open(rxPath)
108                 if err != nil {
109                         return
110                 }
111                 fis, err := dir.Readdir(0)
112                 dir.Close()
113                 if err != nil {
114                         return
115                 }
116                 for _, fi := range fis {
117                         name := fi.Name()
118                         var hshValue []byte
119                         if nock {
120                                 if !strings.HasSuffix(name, NoCKSuffix) ||
121                                         len(name) != Base32Encoded32Len+len(NoCKSuffix) {
122                                         continue
123                                 }
124                                 hshValue, err = Base32Codec.DecodeString(
125                                         strings.TrimSuffix(name, NoCKSuffix),
126                                 )
127                         } else if part {
128                                 if !strings.HasSuffix(name, PartSuffix) ||
129                                         len(name) != Base32Encoded32Len+len(PartSuffix) {
130                                         continue
131                                 }
132                                 hshValue, err = Base32Codec.DecodeString(
133                                         strings.TrimSuffix(name, PartSuffix),
134                                 )
135                         } else {
136                                 if len(name) != Base32Encoded32Len {
137                                         continue
138                                 }
139                                 hshValue, err = Base32Codec.DecodeString(name)
140                         }
141                         if err != nil {
142                                 continue
143                         }
144                         pth := filepath.Join(rxPath, name)
145                         hdrExists := true
146                         var fd *os.File
147                         if nock || part {
148                                 fd, err = os.Open(pth)
149                         } else {
150                                 fd, err = os.Open(JobPath2Hdr(pth))
151                                 if err != nil && os.IsNotExist(err) {
152                                         hdrExists = false
153                                         fd, err = os.Open(pth)
154                                 }
155                         }
156                         if err != nil {
157                                 continue
158                         }
159                         if part {
160                                 job := Job{
161                                         Path:     pth,
162                                         Size:     fi.Size(),
163                                         HshValue: new([MTHSize]byte),
164                                 }
165                                 copy(job.HshValue[:], hshValue)
166                                 jobs <- job
167                                 continue
168                         }
169                         pktEnc, pktEncRaw, err := ctx.HdrRead(fd)
170                         fd.Close()
171                         if err != nil {
172                                 continue
173                         }
174                         switch pktEnc.Magic {
175                         case MagicNNCPEv1.B:
176                                 err = MagicNNCPEv1.TooOld()
177                         case MagicNNCPEv2.B:
178                                 err = MagicNNCPEv2.TooOld()
179                         case MagicNNCPEv3.B:
180                                 err = MagicNNCPEv3.TooOld()
181                         case MagicNNCPEv4.B:
182                                 err = MagicNNCPEv4.TooOld()
183                         case MagicNNCPEv5.B:
184                                 err = MagicNNCPEv5.TooOld()
185                         case MagicNNCPEv6.B:
186                         default:
187                                 err = BadMagic
188                         }
189                         if err != nil {
190                                 ctx.LogE("job", LEs{
191                                         {"XX", string(xx)},
192                                         {"Name", name},
193                                         {"Size", fi.Size()},
194                                 }, err, func(les LEs) string {
195                                         return fmt.Sprintf(
196                                                 "Job %s/%s size: %s",
197                                                 string(xx), name,
198                                                 humanize.IBytes(uint64(fi.Size())),
199                                         )
200                                 })
201                                 continue
202                         }
203                         ctx.LogD("job", LEs{
204                                 {"XX", string(xx)},
205                                 {"Node", pktEnc.Sender},
206                                 {"Name", name},
207                                 {"Nice", int(pktEnc.Nice)},
208                                 {"Size", fi.Size()},
209                         }, func(les LEs) string {
210                                 return fmt.Sprintf(
211                                         "Job %s/%s/%s nice: %s size: %s",
212                                         pktEnc.Sender, string(xx), name,
213                                         NicenessFmt(pktEnc.Nice),
214                                         humanize.IBytes(uint64(fi.Size())),
215                                 )
216                         })
217                         if !hdrExists && ctx.HdrUsage {
218                                 ctx.HdrWrite(pktEncRaw, pth)
219                         }
220                         job := Job{
221                                 PktEnc:   pktEnc,
222                                 Path:     pth,
223                                 Size:     fi.Size(),
224                                 HshValue: new([MTHSize]byte),
225                         }
226                         copy(job.HshValue[:], hshValue)
227                         jobs <- job
228                 }
229         }()
230         return jobs
231 }
232
233 func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job {
234         return ctx.jobsFind(nodeId, xx, false, false)
235 }
236
237 func (ctx *Ctx) JobsNoCK(nodeId *NodeId) chan Job {
238         return ctx.jobsFind(nodeId, TRx, true, false)
239 }
240
241 func (ctx *Ctx) JobsPart(nodeId *NodeId) chan Job {
242         return ctx.jobsFind(nodeId, TRx, false, true)
243 }