]> Cypherpunks.ru repositories - nncp.git/blob - src/jobs.go
Multicast areas
[nncp.git] / src / jobs.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "fmt"
23         "io"
24         "os"
25         "path/filepath"
26         "strings"
27
28         xdr "github.com/davecgh/go-xdr/xdr2"
29         "github.com/dustin/go-humanize"
30 )
31
32 type TRxTx string
33
34 const (
35         TRx TRxTx = "rx"
36         TTx TRxTx = "tx"
37
38         HdrSuffix = ".hdr"
39 )
40
41 type Job struct {
42         PktEnc   *PktEnc
43         Path     string
44         Size     int64
45         HshValue *[MTHSize]byte
46 }
47
48 func (ctx *Ctx) HdrRead(r io.Reader) (*PktEnc, []byte, error) {
49         var pktEnc PktEnc
50         _, err := xdr.Unmarshal(r, &pktEnc)
51         if err != nil {
52                 return nil, nil, err
53         }
54         var raw bytes.Buffer
55         if _, err = xdr.Marshal(&raw, pktEnc); err != nil {
56                 panic(err)
57         }
58         return &pktEnc, raw.Bytes(), nil
59 }
60
61 func (ctx *Ctx) HdrWrite(pktEncRaw []byte, tgt string) error {
62         tmpHdr, err := ctx.NewTmpFile()
63         if err != nil {
64                 ctx.LogE("hdr-write-tmp-new", nil, err, func(les LEs) string {
65                         return "Header writing: new temporary file"
66                 })
67                 return err
68         }
69         if _, err = tmpHdr.Write(pktEncRaw); err != nil {
70                 ctx.LogE("hdr-write-write", nil, err, func(les LEs) string {
71                         return "Header writing: writing"
72                 })
73                 os.Remove(tmpHdr.Name())
74                 return err
75         }
76         if err = tmpHdr.Close(); err != nil {
77                 ctx.LogE("hdr-write-close", nil, err, func(les LEs) string {
78                         return "Header writing: closing"
79                 })
80                 os.Remove(tmpHdr.Name())
81                 return err
82         }
83         if err = os.Rename(tmpHdr.Name(), tgt+HdrSuffix); err != nil {
84                 ctx.LogE("hdr-write-rename", nil, err, func(les LEs) string {
85                         return "Header writing: renaming"
86                 })
87                 return err
88         }
89         return err
90 }
91
92 func (ctx *Ctx) jobsFind(nodeId *NodeId, xx TRxTx, nock, part bool) chan Job {
93         rxPath := filepath.Join(ctx.Spool, nodeId.String(), string(xx))
94         jobs := make(chan Job, 16)
95         go func() {
96                 defer close(jobs)
97                 dir, err := os.Open(rxPath)
98                 if err != nil {
99                         return
100                 }
101                 fis, err := dir.Readdir(0)
102                 dir.Close() // #nosec G104
103                 if err != nil {
104                         return
105                 }
106                 for _, fi := range fis {
107                         name := fi.Name()
108                         var hshValue []byte
109                         if nock {
110                                 if !strings.HasSuffix(name, NoCKSuffix) ||
111                                         len(name) != Base32Encoded32Len+len(NoCKSuffix) {
112                                         continue
113                                 }
114                                 hshValue, err = Base32Codec.DecodeString(
115                                         strings.TrimSuffix(name, NoCKSuffix),
116                                 )
117                         } else if part {
118                                 if !strings.HasSuffix(name, PartSuffix) ||
119                                         len(name) != Base32Encoded32Len+len(PartSuffix) {
120                                         continue
121                                 }
122                                 hshValue, err = Base32Codec.DecodeString(
123                                         strings.TrimSuffix(name, PartSuffix),
124                                 )
125                         } else {
126                                 if len(name) != Base32Encoded32Len {
127                                         continue
128                                 }
129                                 hshValue, err = Base32Codec.DecodeString(name)
130                         }
131                         if err != nil {
132                                 continue
133                         }
134                         pth := filepath.Join(rxPath, name)
135                         hdrExists := true
136                         var fd *os.File
137                         if nock || part {
138                                 fd, err = os.Open(pth)
139                         } else {
140                                 fd, err = os.Open(pth + HdrSuffix)
141                                 if err != nil && os.IsNotExist(err) {
142                                         hdrExists = false
143                                         fd, err = os.Open(pth)
144                                 }
145                         }
146                         if err != nil {
147                                 continue
148                         }
149                         if part {
150                                 job := Job{
151                                         Path:     pth,
152                                         Size:     fi.Size(),
153                                         HshValue: new([MTHSize]byte),
154                                 }
155                                 copy(job.HshValue[:], hshValue)
156                                 jobs <- job
157                                 continue
158                         }
159                         pktEnc, pktEncRaw, err := ctx.HdrRead(fd)
160                         fd.Close()
161                         if err != nil {
162                                 continue
163                         }
164                         switch pktEnc.Magic {
165                         case MagicNNCPEv1.B:
166                                 err = MagicNNCPEv1.TooOld()
167                         case MagicNNCPEv2.B:
168                                 err = MagicNNCPEv2.TooOld()
169                         case MagicNNCPEv3.B:
170                                 err = MagicNNCPEv3.TooOld()
171                         case MagicNNCPEv4.B:
172                                 err = MagicNNCPEv4.TooOld()
173                         case MagicNNCPEv5.B:
174                         default:
175                                 err = BadMagic
176                         }
177                         if err != nil {
178                                 ctx.LogE("job", LEs{
179                                         {"XX", string(xx)},
180                                         {"Name", name},
181                                         {"Size", fi.Size()},
182                                 }, err, func(les LEs) string {
183                                         return fmt.Sprintf(
184                                                 "Job %s/%s size: %s",
185                                                 string(xx), name,
186                                                 humanize.IBytes(uint64(fi.Size())),
187                                         )
188                                 })
189                                 continue
190                         }
191                         ctx.LogD("job", LEs{
192                                 {"XX", string(xx)},
193                                 {"Node", pktEnc.Sender},
194                                 {"Name", name},
195                                 {"Nice", int(pktEnc.Nice)},
196                                 {"Size", fi.Size()},
197                         }, func(les LEs) string {
198                                 return fmt.Sprintf(
199                                         "Job %s/%s/%s nice: %s size: %s",
200                                         pktEnc.Sender, string(xx), name,
201                                         NicenessFmt(pktEnc.Nice),
202                                         humanize.IBytes(uint64(fi.Size())),
203                                 )
204                         })
205                         if !hdrExists && ctx.HdrUsage {
206                                 ctx.HdrWrite(pktEncRaw, pth)
207                         }
208                         job := Job{
209                                 PktEnc:   pktEnc,
210                                 Path:     pth,
211                                 Size:     fi.Size(),
212                                 HshValue: new([MTHSize]byte),
213                         }
214                         copy(job.HshValue[:], hshValue)
215                         jobs <- job
216                 }
217         }()
218         return jobs
219 }
220
221 func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job {
222         return ctx.jobsFind(nodeId, xx, false, false)
223 }
224
225 func (ctx *Ctx) JobsNoCK(nodeId *NodeId) chan Job {
226         return ctx.jobsFind(nodeId, TRx, true, false)
227 }
228
229 func (ctx *Ctx) JobsPart(nodeId *NodeId) chan Job {
230         return ctx.jobsFind(nodeId, TRx, false, true)
231 }