]> Cypherpunks.ru repositories - nncp.git/blob - src/cypherpunks.ru/nncp/toss.go
Move lockdir to separate file
[nncp.git] / src / cypherpunks.ru / nncp / toss.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2017 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 package nncp
20
21 import (
22         "bufio"
23         "compress/zlib"
24         "fmt"
25         "io"
26         "io/ioutil"
27         "log"
28         "mime"
29         "os"
30         "os/exec"
31         "path"
32         "path/filepath"
33         "strconv"
34         "strings"
35
36         "github.com/davecgh/go-xdr/xdr2"
37         "github.com/dustin/go-humanize"
38         "golang.org/x/crypto/blake2b"
39 )
40
41 func newNotification(fromTo *FromToYAML, subject string) io.Reader {
42         return strings.NewReader(fmt.Sprintf(
43                 "From: %s\nTo: %s\nSubject: %s\n",
44                 fromTo.From,
45                 fromTo.To,
46                 mime.BEncoding.Encode("UTF-8", subject),
47         ))
48 }
49
50 func (ctx *Ctx) Toss(nodeId *NodeId, nice uint8, dryRun bool) bool {
51         dirLock, err := ctx.LockDir(nodeId, TRx)
52         if err != nil {
53                 return false
54         }
55         defer ctx.UnlockDir(dirLock)
56         isBad := false
57         for job := range ctx.Jobs(nodeId, TRx) {
58                 pktName := filepath.Base(job.Fd.Name())
59                 sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName}
60                 if job.PktEnc.Nice > nice {
61                         ctx.LogD("rx", SdsAdd(sds, SDS{
62                                 "nice": strconv.Itoa(int(job.PktEnc.Nice)),
63                         }), "too nice")
64                         continue
65                 }
66                 pipeR, pipeW := io.Pipe()
67                 errs := make(chan error, 1)
68                 go func(job Job) {
69                         pipeWB := bufio.NewWriter(pipeW)
70                         _, _, err := PktEncRead(
71                                 ctx.Self,
72                                 ctx.Neigh,
73                                 bufio.NewReader(job.Fd),
74                                 pipeWB,
75                         )
76                         errs <- err
77                         pipeWB.Flush()
78                         pipeW.Close()
79                         job.Fd.Close()
80                         if err != nil {
81                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "decryption")
82                         }
83                 }(job)
84                 var pkt Pkt
85                 var err error
86                 var pktSize int64
87                 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
88                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "unmarshal")
89                         isBad = true
90                         goto Closing
91                 }
92                 pktSize = job.Size - PktEncOverhead - PktOverhead
93                 sds["size"] = strconv.FormatInt(pktSize, 10)
94                 ctx.LogD("rx", sds, "taken")
95                 switch pkt.Type {
96                 case PktTypeMail:
97                         recipients := string(pkt.Path[:int(pkt.PathLen)])
98                         sds := SdsAdd(sds, SDS{
99                                 "type": "mail",
100                                 "dst":  recipients,
101                         })
102                         decompressor, err := zlib.NewReader(pipeR)
103                         if err != nil {
104                                 log.Fatalln(err)
105                         }
106                         sendmail := ctx.Neigh[*job.PktEnc.Sender].Sendmail
107                         if len(sendmail) == 0 {
108                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": "No sendmail configured"}), "")
109                                 isBad = true
110                                 goto Closing
111                         }
112                         if !dryRun {
113                                 cmd := exec.Command(
114                                         sendmail[0],
115                                         append(
116                                                 sendmail[1:len(sendmail)],
117                                                 strings.Split(recipients, " ")...,
118                                         )...,
119                                 )
120                                 cmd.Stdin = decompressor
121                                 if err = cmd.Run(); err != nil {
122                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "sendmail")
123                                         isBad = true
124                                         goto Closing
125                                 }
126                         }
127                         ctx.LogI("rx", sds, "")
128                         if !dryRun {
129                                 if err = os.Remove(job.Fd.Name()); err != nil {
130                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
131                                         isBad = true
132                                 }
133                         }
134                 case PktTypeFile:
135                         dst := string(pkt.Path[:int(pkt.PathLen)])
136                         sds := SdsAdd(sds, SDS{"type": "file", "dst": dst})
137                         if filepath.IsAbs(dst) {
138                                 ctx.LogE("rx", sds, "non-relative destination path")
139                                 isBad = true
140                                 goto Closing
141                         }
142                         incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
143                         if incoming == nil {
144                                 ctx.LogE("rx", sds, "incoming is not allowed")
145                                 isBad = true
146                                 goto Closing
147                         }
148                         dir := filepath.Join(*incoming, path.Dir(dst))
149                         if err = os.MkdirAll(dir, os.FileMode(0700)); err != nil {
150                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mkdir")
151                                 isBad = true
152                                 goto Closing
153                         }
154                         if !dryRun {
155                                 tmp, err := ioutil.TempFile(dir, "nncp-file")
156                                 sds["tmp"] = tmp.Name()
157                                 ctx.LogD("rx", sds, "created")
158                                 if err != nil {
159                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mktemp")
160                                         isBad = true
161                                         goto Closing
162                                 }
163                                 bufW := bufio.NewWriter(tmp)
164                                 if _, err = io.Copy(bufW, pipeR); err != nil {
165                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
166                                         isBad = true
167                                         goto Closing
168                                 }
169                                 bufW.Flush()
170                                 tmp.Sync()
171                                 tmp.Close()
172                                 dstPathOrig := filepath.Join(*incoming, dst)
173                                 dstPath := dstPathOrig
174                                 dstPathCtr := 0
175                                 for {
176                                         if _, err = os.Stat(dstPath); err != nil {
177                                                 if os.IsNotExist(err) {
178                                                         break
179                                                 }
180                                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "stat")
181                                                 isBad = true
182                                                 goto Closing
183                                         }
184                                         dstPath = dstPathOrig + strconv.Itoa(dstPathCtr)
185                                         dstPathCtr++
186                                 }
187                                 if err = os.Rename(tmp.Name(), dstPath); err != nil {
188                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "rename")
189                                         isBad = true
190                                 }
191                                 delete(sds, "tmp")
192                         }
193                         ctx.LogI("rx", sds, "")
194                         if !dryRun {
195                                 if err = os.Remove(job.Fd.Name()); err != nil {
196                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
197                                         isBad = true
198                                 }
199                                 sendmail := ctx.Neigh[*ctx.SelfId].Sendmail
200                                 if ctx.NotifyFile != nil {
201                                         cmd := exec.Command(
202                                                 sendmail[0],
203                                                 append(sendmail[1:len(sendmail)], ctx.NotifyFile.To)...,
204                                         )
205                                         cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
206                                                 "File from %s: %s (%s)",
207                                                 ctx.Neigh[*job.PktEnc.Sender].Name,
208                                                 dst,
209                                                 humanize.IBytes(uint64(pktSize)),
210                                         ))
211                                         cmd.Run()
212                                 }
213                         }
214                 case PktTypeFreq:
215                         src := string(pkt.Path[:int(pkt.PathLen)])
216                         if filepath.IsAbs(src) {
217                                 ctx.LogE("rx", sds, "non-relative source path")
218                                 isBad = true
219                                 goto Closing
220                         }
221                         sds := SdsAdd(sds, SDS{"type": "freq", "src": src})
222                         dstRaw, err := ioutil.ReadAll(pipeR)
223                         if err != nil {
224                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "read")
225                                 isBad = true
226                                 goto Closing
227                         }
228                         dst := string(dstRaw)
229                         sds["dst"] = dst
230                         sender := ctx.Neigh[*job.PktEnc.Sender]
231                         freq := sender.Freq
232                         if freq == nil {
233                                 ctx.LogE("rx", sds, "freqing is not allowed")
234                                 isBad = true
235                                 goto Closing
236                         }
237                         if !dryRun {
238                                 if sender.FreqChunked == 0 {
239                                         err = ctx.TxFile(
240                                                 sender,
241                                                 job.PktEnc.Nice,
242                                                 filepath.Join(*freq, src),
243                                                 dst,
244                                                 sender.FreqMinSize,
245                                         )
246                                 } else {
247                                         err = ctx.TxFileChunked(
248                                                 sender,
249                                                 job.PktEnc.Nice,
250                                                 filepath.Join(*freq, src),
251                                                 dst,
252                                                 sender.FreqMinSize,
253                                                 sender.FreqChunked,
254                                         )
255                                 }
256                                 if err != nil {
257                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx file")
258                                         isBad = true
259                                         goto Closing
260                                 }
261                         }
262                         ctx.LogI("rx", sds, "")
263                         if !dryRun {
264                                 if err = os.Remove(job.Fd.Name()); err != nil {
265                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
266                                         isBad = true
267                                 }
268                                 if ctx.NotifyFreq != nil {
269                                         sendmail := ctx.Neigh[*ctx.SelfId].Sendmail
270                                         cmd := exec.Command(
271                                                 sendmail[0],
272                                                 append(sendmail[1:len(sendmail)], ctx.NotifyFreq.To)...,
273                                         )
274                                         cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
275                                                 "Freq from %s: %s",
276                                                 ctx.Neigh[*job.PktEnc.Sender].Name,
277                                                 src,
278                                         ))
279                                         cmd.Run()
280                                 }
281                         }
282                 case PktTypeTrns:
283                         dst := new([blake2b.Size256]byte)
284                         copy(dst[:], pkt.Path[:int(pkt.PathLen)])
285                         nodeId := NodeId(*dst)
286                         node, known := ctx.Neigh[nodeId]
287                         sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId})
288                         if !known {
289                                 ctx.LogE("rx", sds, "unknown node")
290                                 isBad = true
291                                 goto Closing
292                         }
293                         ctx.LogD("rx", sds, "taken")
294                         if !dryRun {
295                                 if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
296                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx trns")
297                                         isBad = true
298                                         goto Closing
299                                 }
300                         }
301                         ctx.LogI("rx", sds, "")
302                         if !dryRun {
303                                 if err = os.Remove(job.Fd.Name()); err != nil {
304                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
305                                         isBad = true
306                                 }
307                         }
308                 default:
309                         ctx.LogE("rx", sds, "unknown type")
310                         isBad = true
311                 }
312         Closing:
313                 pipeR.Close()
314         }
315         return isBad
316 }