]> Cypherpunks.ru repositories - nncp.git/blob - src/cypherpunks.ru/nncp/toss.go
Merge branch 'develop'
[nncp.git] / src / cypherpunks.ru / nncp / toss.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2017 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 package nncp
20
21 import (
22         "bufio"
23         "compress/zlib"
24         "fmt"
25         "io"
26         "io/ioutil"
27         "log"
28         "mime"
29         "os"
30         "os/exec"
31         "path"
32         "path/filepath"
33         "strconv"
34         "strings"
35
36         "github.com/davecgh/go-xdr/xdr2"
37         "github.com/dustin/go-humanize"
38         "golang.org/x/crypto/blake2b"
39         "golang.org/x/sys/unix"
40 )
41
42 func newNotification(fromTo *FromToYAML, subject string) io.Reader {
43         return strings.NewReader(fmt.Sprintf(
44                 "From: %s\nTo: %s\nSubject: %s\n",
45                 fromTo.From,
46                 fromTo.To,
47                 mime.BEncoding.Encode("UTF-8", subject),
48         ))
49 }
50
51 func (ctx *Ctx) LockDir(nodeId *NodeId, xx TRxTx) (*os.File, error) {
52         ctx.ensureRxDir(nodeId)
53         lockPath := filepath.Join(ctx.Spool, nodeId.String(), string(xx)) + ".lock"
54         dirLock, err := os.OpenFile(
55                 lockPath,
56                 os.O_CREATE|os.O_WRONLY,
57                 os.FileMode(0600),
58         )
59         if err != nil {
60                 ctx.LogE("lockdir", SDS{"path": lockPath, "err": err}, "")
61                 return nil, err
62         }
63         err = unix.Flock(int(dirLock.Fd()), unix.LOCK_EX|unix.LOCK_NB)
64         if err != nil {
65                 ctx.LogE("lockdir", SDS{"path": lockPath, "err": err}, "")
66                 dirLock.Close()
67                 return nil, err
68         }
69         return dirLock, nil
70 }
71
72 func (ctx *Ctx) UnlockDir(fd *os.File) {
73         if fd != nil {
74                 unix.Flock(int(fd.Fd()), unix.LOCK_UN)
75                 fd.Close()
76         }
77 }
78
79 func (ctx *Ctx) Toss(nodeId *NodeId, nice uint8, dryRun bool) bool {
80         dirLock, err := ctx.LockDir(nodeId, TRx)
81         if err != nil {
82                 return false
83         }
84         defer ctx.UnlockDir(dirLock)
85         isBad := false
86         for job := range ctx.Jobs(nodeId, TRx) {
87                 pktName := filepath.Base(job.Fd.Name())
88                 sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName}
89                 if job.PktEnc.Nice > nice {
90                         ctx.LogD("rx", SdsAdd(sds, SDS{
91                                 "nice": strconv.Itoa(int(job.PktEnc.Nice)),
92                         }), "too nice")
93                         continue
94                 }
95                 pipeR, pipeW := io.Pipe()
96                 errs := make(chan error, 1)
97                 go func(job Job) {
98                         pipeWB := bufio.NewWriter(pipeW)
99                         _, _, err := PktEncRead(
100                                 ctx.Self,
101                                 ctx.Neigh,
102                                 bufio.NewReader(job.Fd),
103                                 pipeWB,
104                         )
105                         errs <- err
106                         pipeWB.Flush()
107                         pipeW.Close()
108                         job.Fd.Close()
109                         if err != nil {
110                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "decryption")
111                         }
112                 }(job)
113                 var pkt Pkt
114                 var err error
115                 var pktSize int64
116                 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
117                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "unmarshal")
118                         isBad = true
119                         goto Closing
120                 }
121                 pktSize = job.Size - PktEncOverhead - PktOverhead
122                 sds["size"] = strconv.FormatInt(pktSize, 10)
123                 ctx.LogD("rx", sds, "taken")
124                 switch pkt.Type {
125                 case PktTypeMail:
126                         recipients := string(pkt.Path[:int(pkt.PathLen)])
127                         sds := SdsAdd(sds, SDS{
128                                 "type": "mail",
129                                 "dst":  recipients,
130                         })
131                         decompressor, err := zlib.NewReader(pipeR)
132                         if err != nil {
133                                 log.Fatalln(err)
134                         }
135                         sendmail := ctx.Neigh[*job.PktEnc.Sender].Sendmail
136                         if len(sendmail) == 0 {
137                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": "No sendmail configured"}), "")
138                                 isBad = true
139                                 goto Closing
140                         }
141                         if !dryRun {
142                                 cmd := exec.Command(
143                                         sendmail[0],
144                                         append(
145                                                 sendmail[1:len(sendmail)],
146                                                 strings.Split(recipients, " ")...,
147                                         )...,
148                                 )
149                                 cmd.Stdin = decompressor
150                                 if err = cmd.Run(); err != nil {
151                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "sendmail")
152                                         isBad = true
153                                         goto Closing
154                                 }
155                         }
156                         ctx.LogI("rx", sds, "")
157                         if !dryRun {
158                                 if err = os.Remove(job.Fd.Name()); err != nil {
159                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
160                                         isBad = true
161                                 }
162                         }
163                 case PktTypeFile:
164                         dst := string(pkt.Path[:int(pkt.PathLen)])
165                         sds := SdsAdd(sds, SDS{"type": "file", "dst": dst})
166                         if filepath.IsAbs(dst) {
167                                 ctx.LogE("rx", sds, "non-relative destination path")
168                                 isBad = true
169                                 goto Closing
170                         }
171                         incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
172                         if incoming == nil {
173                                 ctx.LogE("rx", sds, "incoming is not allowed")
174                                 isBad = true
175                                 goto Closing
176                         }
177                         dir := filepath.Join(*incoming, path.Dir(dst))
178                         if err = os.MkdirAll(dir, os.FileMode(0700)); err != nil {
179                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mkdir")
180                                 isBad = true
181                                 goto Closing
182                         }
183                         if !dryRun {
184                                 tmp, err := ioutil.TempFile(dir, "nncp-file")
185                                 sds["tmp"] = tmp.Name()
186                                 ctx.LogD("rx", sds, "created")
187                                 if err != nil {
188                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mktemp")
189                                         isBad = true
190                                         goto Closing
191                                 }
192                                 bufW := bufio.NewWriter(tmp)
193                                 if _, err = io.Copy(bufW, pipeR); err != nil {
194                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
195                                         isBad = true
196                                         goto Closing
197                                 }
198                                 bufW.Flush()
199                                 tmp.Sync()
200                                 tmp.Close()
201                                 dstPathOrig := filepath.Join(*incoming, dst)
202                                 dstPath := dstPathOrig
203                                 dstPathCtr := 0
204                                 for {
205                                         if _, err = os.Stat(dstPath); err != nil {
206                                                 if os.IsNotExist(err) {
207                                                         break
208                                                 }
209                                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "stat")
210                                                 isBad = true
211                                                 goto Closing
212                                         }
213                                         dstPath = dstPathOrig + strconv.Itoa(dstPathCtr)
214                                         dstPathCtr++
215                                 }
216                                 if err = os.Rename(tmp.Name(), dstPath); err != nil {
217                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "rename")
218                                         isBad = true
219                                 }
220                                 delete(sds, "tmp")
221                         }
222                         ctx.LogI("rx", sds, "")
223                         if !dryRun {
224                                 if err = os.Remove(job.Fd.Name()); err != nil {
225                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
226                                         isBad = true
227                                 }
228                                 sendmail := ctx.Neigh[*ctx.SelfId].Sendmail
229                                 if ctx.NotifyFile != nil {
230                                         cmd := exec.Command(
231                                                 sendmail[0],
232                                                 append(sendmail[1:len(sendmail)], ctx.NotifyFile.To)...,
233                                         )
234                                         cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
235                                                 "File from %s: %s (%s)",
236                                                 ctx.Neigh[*job.PktEnc.Sender].Name,
237                                                 dst,
238                                                 humanize.IBytes(uint64(pktSize)),
239                                         ))
240                                         cmd.Run()
241                                 }
242                         }
243                 case PktTypeFreq:
244                         src := string(pkt.Path[:int(pkt.PathLen)])
245                         if filepath.IsAbs(src) {
246                                 ctx.LogE("rx", sds, "non-relative source path")
247                                 isBad = true
248                                 goto Closing
249                         }
250                         sds := SdsAdd(sds, SDS{"type": "freq", "src": src})
251                         dstRaw, err := ioutil.ReadAll(pipeR)
252                         if err != nil {
253                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "read")
254                                 isBad = true
255                                 goto Closing
256                         }
257                         dst := string(dstRaw)
258                         sds["dst"] = dst
259                         sender := ctx.Neigh[*job.PktEnc.Sender]
260                         freq := sender.Freq
261                         if freq == nil {
262                                 ctx.LogE("rx", sds, "freqing is not allowed")
263                                 isBad = true
264                                 goto Closing
265                         }
266                         if !dryRun {
267                                 if sender.FreqChunked == 0 {
268                                         err = ctx.TxFile(
269                                                 sender,
270                                                 job.PktEnc.Nice,
271                                                 filepath.Join(*freq, src),
272                                                 dst,
273                                                 sender.FreqMinSize,
274                                         )
275                                 } else {
276                                         err = ctx.TxFileChunked(
277                                                 sender,
278                                                 job.PktEnc.Nice,
279                                                 filepath.Join(*freq, src),
280                                                 dst,
281                                                 sender.FreqMinSize,
282                                                 sender.FreqChunked,
283                                         )
284                                 }
285                                 if err != nil {
286                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx file")
287                                         isBad = true
288                                         goto Closing
289                                 }
290                         }
291                         ctx.LogI("rx", sds, "")
292                         if !dryRun {
293                                 if err = os.Remove(job.Fd.Name()); err != nil {
294                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
295                                         isBad = true
296                                 }
297                                 if ctx.NotifyFreq != nil {
298                                         sendmail := ctx.Neigh[*ctx.SelfId].Sendmail
299                                         cmd := exec.Command(
300                                                 sendmail[0],
301                                                 append(sendmail[1:len(sendmail)], ctx.NotifyFreq.To)...,
302                                         )
303                                         cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
304                                                 "Freq from %s: %s",
305                                                 ctx.Neigh[*job.PktEnc.Sender].Name,
306                                                 src,
307                                         ))
308                                         cmd.Run()
309                                 }
310                         }
311                 case PktTypeTrns:
312                         dst := new([blake2b.Size256]byte)
313                         copy(dst[:], pkt.Path[:int(pkt.PathLen)])
314                         nodeId := NodeId(*dst)
315                         node, known := ctx.Neigh[nodeId]
316                         sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId})
317                         if !known {
318                                 ctx.LogE("rx", sds, "unknown node")
319                                 isBad = true
320                                 goto Closing
321                         }
322                         ctx.LogD("rx", sds, "taken")
323                         if !dryRun {
324                                 if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
325                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx trns")
326                                         isBad = true
327                                         goto Closing
328                                 }
329                         }
330                         ctx.LogI("rx", sds, "")
331                         if !dryRun {
332                                 if err = os.Remove(job.Fd.Name()); err != nil {
333                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
334                                         isBad = true
335                                 }
336                         }
337                 default:
338                         ctx.LogE("rx", sds, "unknown type")
339                         isBad = true
340                 }
341         Closing:
342                 pipeR.Close()
343         }
344         return isBad
345 }