]> Cypherpunks.ru repositories - nncp.git/blob - src/jobs.go
Merge branch 'develop'
[nncp.git] / src / jobs.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "os"
23         "path/filepath"
24         "strings"
25
26         xdr "github.com/davecgh/go-xdr/xdr2"
27 )
28
29 type TRxTx string
30
31 const (
32         TRx TRxTx = "rx"
33         TTx TRxTx = "tx"
34
35         HdrSuffix = ".hdr"
36 )
37
38 type Job struct {
39         PktEnc   *PktEnc
40         Path     string
41         Size     int64
42         HshValue *[32]byte
43 }
44
45 func (ctx *Ctx) HdrRead(fd *os.File) (*PktEnc, []byte, error) {
46         var pktEnc PktEnc
47         _, err := xdr.Unmarshal(fd, &pktEnc)
48         if err != nil {
49                 return nil, nil, err
50         }
51         var raw bytes.Buffer
52         if _, err = xdr.Marshal(&raw, pktEnc); err != nil {
53                 panic(err)
54         }
55         return &pktEnc, raw.Bytes(), nil
56 }
57
58 func (ctx *Ctx) HdrWrite(pktEncRaw []byte, tgt string) error {
59         tmpHdr, err := ctx.NewTmpFile()
60         if err != nil {
61                 ctx.LogE("hdr-write", []LE{}, err, "new")
62                 return err
63         }
64         if _, err = tmpHdr.Write(pktEncRaw); err != nil {
65                 ctx.LogE("hdr-write", []LE{}, err, "write")
66                 os.Remove(tmpHdr.Name())
67                 return err
68         }
69         if err = tmpHdr.Close(); err != nil {
70                 ctx.LogE("hdr-write", []LE{}, err, "close")
71                 os.Remove(tmpHdr.Name())
72                 return err
73         }
74         if err = os.Rename(tmpHdr.Name(), tgt+HdrSuffix); err != nil {
75                 ctx.LogE("hdr-write", []LE{}, err, "rename")
76                 return err
77         }
78         return err
79 }
80
81 func (ctx *Ctx) jobsFind(nodeId *NodeId, xx TRxTx, nock bool) chan Job {
82         rxPath := filepath.Join(ctx.Spool, nodeId.String(), string(xx))
83         jobs := make(chan Job, 16)
84         go func() {
85                 defer close(jobs)
86                 dir, err := os.Open(rxPath)
87                 if err != nil {
88                         return
89                 }
90                 fis, err := dir.Readdir(0)
91                 dir.Close() // #nosec G104
92                 if err != nil {
93                         return
94                 }
95                 for _, fi := range fis {
96                         name := fi.Name()
97                         var hshValue []byte
98                         if nock {
99                                 if !strings.HasSuffix(name, NoCKSuffix) ||
100                                         len(name) != Base32Encoded32Len+len(NoCKSuffix) {
101                                         continue
102                                 }
103                                 hshValue, err = Base32Codec.DecodeString(
104                                         strings.TrimSuffix(name, NoCKSuffix),
105                                 )
106                         } else {
107                                 if len(name) != Base32Encoded32Len {
108                                         continue
109                                 }
110                                 hshValue, err = Base32Codec.DecodeString(name)
111                         }
112                         if err != nil {
113                                 continue
114                         }
115                         pth := filepath.Join(rxPath, name)
116                         hdrExists := true
117                         var fd *os.File
118                         if nock {
119                                 fd, err = os.Open(pth)
120                         } else {
121                                 fd, err = os.Open(pth + HdrSuffix)
122                                 if err != nil && os.IsNotExist(err) {
123                                         hdrExists = false
124                                         fd, err = os.Open(pth)
125                                 }
126                         }
127                         if err != nil {
128                                 continue
129                         }
130                         pktEnc, pktEncRaw, err := ctx.HdrRead(fd)
131                         fd.Close()
132                         if err != nil || pktEnc.Magic != MagicNNCPEv4 {
133                                 continue
134                         }
135                         ctx.LogD("jobs", LEs{
136                                 {"XX", string(xx)},
137                                 {"Node", pktEnc.Sender},
138                                 {"Name", name},
139                                 {"Nice", int(pktEnc.Nice)},
140                                 {"Size", fi.Size()},
141                         }, "taken")
142                         if !hdrExists && ctx.HdrUsage {
143                                 ctx.HdrWrite(pktEncRaw, pth)
144                         }
145                         job := Job{
146                                 PktEnc:   pktEnc,
147                                 Path:     pth,
148                                 Size:     fi.Size(),
149                                 HshValue: new([32]byte),
150                         }
151                         copy(job.HshValue[:], hshValue)
152                         jobs <- job
153                 }
154         }()
155         return jobs
156 }
157
158 func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job {
159         return ctx.jobsFind(nodeId, xx, false)
160 }
161
162 func (ctx *Ctx) JobsNoCK(nodeId *NodeId) chan Job {
163         return ctx.jobsFind(nodeId, TRx, true)
164 }