]> Cypherpunks.ru repositories - nncp.git/blob - src/cypherpunks.ru/nncp/toss.go
Freqed files have different niceness
[nncp.git] / src / cypherpunks.ru / nncp / toss.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2018 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 package nncp
20
21 import (
22         "bufio"
23         "compress/zlib"
24         "fmt"
25         "io"
26         "io/ioutil"
27         "log"
28         "mime"
29         "os"
30         "os/exec"
31         "path"
32         "path/filepath"
33         "strconv"
34         "strings"
35
36         "github.com/davecgh/go-xdr/xdr2"
37         "github.com/dustin/go-humanize"
38         "golang.org/x/crypto/blake2b"
39 )
40
41 const (
42         SeenSuffix = ".seen"
43 )
44
45 func newNotification(fromTo *FromToYAML, subject string) io.Reader {
46         return strings.NewReader(fmt.Sprintf(
47                 "From: %s\nTo: %s\nSubject: %s\n",
48                 fromTo.From,
49                 fromTo.To,
50                 mime.BEncoding.Encode("UTF-8", subject),
51         ))
52 }
53
54 func (ctx *Ctx) Toss(nodeId *NodeId, nice uint8, dryRun, doSeen bool) bool {
55         isBad := false
56         for job := range ctx.Jobs(nodeId, TRx) {
57                 pktName := filepath.Base(job.Fd.Name())
58                 sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName}
59                 if job.PktEnc.Nice > nice {
60                         ctx.LogD("rx", SdsAdd(sds, SDS{
61                                 "nice": strconv.Itoa(int(job.PktEnc.Nice)),
62                         }), "too nice")
63                         continue
64                 }
65                 pipeR, pipeW := io.Pipe()
66                 errs := make(chan error, 1)
67                 go func(job Job) {
68                         pipeWB := bufio.NewWriter(pipeW)
69                         _, _, err := PktEncRead(
70                                 ctx.Self,
71                                 ctx.Neigh,
72                                 bufio.NewReader(job.Fd),
73                                 pipeWB,
74                         )
75                         errs <- err
76                         pipeWB.Flush()
77                         pipeW.Close()
78                         job.Fd.Close()
79                         if err != nil {
80                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "decryption")
81                         }
82                 }(job)
83                 var pkt Pkt
84                 var err error
85                 var pktSize int64
86                 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
87                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "unmarshal")
88                         isBad = true
89                         goto Closing
90                 }
91                 pktSize = job.Size - PktEncOverhead - PktOverhead
92                 sds["size"] = strconv.FormatInt(pktSize, 10)
93                 ctx.LogD("rx", sds, "taken")
94                 switch pkt.Type {
95                 case PktTypeMail:
96                         recipients := string(pkt.Path[:int(pkt.PathLen)])
97                         sds := SdsAdd(sds, SDS{
98                                 "type": "mail",
99                                 "dst":  recipients,
100                         })
101                         decompressor, err := zlib.NewReader(pipeR)
102                         if err != nil {
103                                 log.Fatalln(err)
104                         }
105                         sender := ctx.Neigh[*job.PktEnc.Sender]
106                         sendmail := sender.Sendmail
107                         if len(sendmail) == 0 {
108                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": "No sendmail configured"}), "")
109                                 isBad = true
110                                 goto Closing
111                         }
112                         if !dryRun {
113                                 cmd := exec.Command(
114                                         sendmail[0],
115                                         append(
116                                                 sendmail[1:len(sendmail)],
117                                                 strings.Split(recipients, " ")...,
118                                         )...,
119                                 )
120                                 cmd.Env = append(cmd.Env, "NNCP_SENDER="+sender.Id.String())
121                                 cmd.Env = append(cmd.Env, "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)))
122                                 cmd.Stdin = decompressor
123                                 if err = cmd.Run(); err != nil {
124                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "sendmail")
125                                         isBad = true
126                                         goto Closing
127                                 }
128                         }
129                         ctx.LogI("rx", sds, "")
130                         if !dryRun {
131                                 if doSeen {
132                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
133                                                 fd.Close()
134                                         }
135                                 }
136                                 if err = os.Remove(job.Fd.Name()); err != nil {
137                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
138                                         isBad = true
139                                 }
140                         }
141                 case PktTypeFile:
142                         dst := string(pkt.Path[:int(pkt.PathLen)])
143                         sds := SdsAdd(sds, SDS{"type": "file", "dst": dst})
144                         if filepath.IsAbs(dst) {
145                                 ctx.LogE("rx", sds, "non-relative destination path")
146                                 isBad = true
147                                 goto Closing
148                         }
149                         incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
150                         if incoming == nil {
151                                 ctx.LogE("rx", sds, "incoming is not allowed")
152                                 isBad = true
153                                 goto Closing
154                         }
155                         dir := filepath.Join(*incoming, path.Dir(dst))
156                         if err = os.MkdirAll(dir, os.FileMode(0700)); err != nil {
157                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mkdir")
158                                 isBad = true
159                                 goto Closing
160                         }
161                         if !dryRun {
162                                 tmp, err := ioutil.TempFile(dir, "nncp-file")
163                                 sds["tmp"] = tmp.Name()
164                                 ctx.LogD("rx", sds, "created")
165                                 if err != nil {
166                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mktemp")
167                                         isBad = true
168                                         goto Closing
169                                 }
170                                 bufW := bufio.NewWriter(tmp)
171                                 if _, err = io.Copy(bufW, pipeR); err != nil {
172                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
173                                         isBad = true
174                                         goto Closing
175                                 }
176                                 bufW.Flush()
177                                 tmp.Sync()
178                                 tmp.Close()
179                                 dstPathOrig := filepath.Join(*incoming, dst)
180                                 dstPath := dstPathOrig
181                                 dstPathCtr := 0
182                                 for {
183                                         if _, err = os.Stat(dstPath); err != nil {
184                                                 if os.IsNotExist(err) {
185                                                         break
186                                                 }
187                                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "stat")
188                                                 isBad = true
189                                                 goto Closing
190                                         }
191                                         dstPath = dstPathOrig + strconv.Itoa(dstPathCtr)
192                                         dstPathCtr++
193                                 }
194                                 if err = os.Rename(tmp.Name(), dstPath); err != nil {
195                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "rename")
196                                         isBad = true
197                                 }
198                                 delete(sds, "tmp")
199                         }
200                         ctx.LogI("rx", sds, "")
201                         if !dryRun {
202                                 if doSeen {
203                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
204                                                 fd.Close()
205                                         }
206                                 }
207                                 if err = os.Remove(job.Fd.Name()); err != nil {
208                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
209                                         isBad = true
210                                 }
211                                 sendmail := ctx.Neigh[*ctx.SelfId].Sendmail
212                                 if ctx.NotifyFile != nil {
213                                         cmd := exec.Command(
214                                                 sendmail[0],
215                                                 append(sendmail[1:len(sendmail)], ctx.NotifyFile.To)...,
216                                         )
217                                         cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
218                                                 "File from %s: %s (%s)",
219                                                 ctx.Neigh[*job.PktEnc.Sender].Name,
220                                                 dst,
221                                                 humanize.IBytes(uint64(pktSize)),
222                                         ))
223                                         cmd.Run()
224                                 }
225                         }
226                 case PktTypeFreq:
227                         src := string(pkt.Path[:int(pkt.PathLen)])
228                         if filepath.IsAbs(src) {
229                                 ctx.LogE("rx", sds, "non-relative source path")
230                                 isBad = true
231                                 goto Closing
232                         }
233                         sds := SdsAdd(sds, SDS{"type": "freq", "src": src})
234                         dstRaw, err := ioutil.ReadAll(pipeR)
235                         if err != nil {
236                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "read")
237                                 isBad = true
238                                 goto Closing
239                         }
240                         dst := string(dstRaw)
241                         sds["dst"] = dst
242                         sender := ctx.Neigh[*job.PktEnc.Sender]
243                         freq := sender.Freq
244                         if freq == nil {
245                                 ctx.LogE("rx", sds, "freqing is not allowed")
246                                 isBad = true
247                                 goto Closing
248                         }
249                         if !dryRun {
250                                 if sender.FreqChunked == 0 {
251                                         err = ctx.TxFile(
252                                                 sender,
253                                                 pkt.Nice,
254                                                 filepath.Join(*freq, src),
255                                                 dst,
256                                                 sender.FreqMinSize,
257                                         )
258                                 } else {
259                                         err = ctx.TxFileChunked(
260                                                 sender,
261                                                 pkt.Nice,
262                                                 filepath.Join(*freq, src),
263                                                 dst,
264                                                 sender.FreqMinSize,
265                                                 sender.FreqChunked,
266                                         )
267                                 }
268                                 if err != nil {
269                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx file")
270                                         isBad = true
271                                         goto Closing
272                                 }
273                         }
274                         ctx.LogI("rx", sds, "")
275                         if !dryRun {
276                                 if doSeen {
277                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
278                                                 fd.Close()
279                                         }
280                                 }
281                                 if err = os.Remove(job.Fd.Name()); err != nil {
282                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
283                                         isBad = true
284                                 }
285                                 if ctx.NotifyFreq != nil {
286                                         sendmail := ctx.Neigh[*ctx.SelfId].Sendmail
287                                         cmd := exec.Command(
288                                                 sendmail[0],
289                                                 append(sendmail[1:len(sendmail)], ctx.NotifyFreq.To)...,
290                                         )
291                                         cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
292                                                 "Freq from %s: %s",
293                                                 ctx.Neigh[*job.PktEnc.Sender].Name,
294                                                 src,
295                                         ))
296                                         cmd.Run()
297                                 }
298                         }
299                 case PktTypeTrns:
300                         dst := new([blake2b.Size256]byte)
301                         copy(dst[:], pkt.Path[:int(pkt.PathLen)])
302                         nodeId := NodeId(*dst)
303                         node, known := ctx.Neigh[nodeId]
304                         sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId})
305                         if !known {
306                                 ctx.LogE("rx", sds, "unknown node")
307                                 isBad = true
308                                 goto Closing
309                         }
310                         ctx.LogD("rx", sds, "taken")
311                         if !dryRun {
312                                 if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
313                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx trns")
314                                         isBad = true
315                                         goto Closing
316                                 }
317                         }
318                         ctx.LogI("rx", sds, "")
319                         if !dryRun {
320                                 if doSeen {
321                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
322                                                 fd.Close()
323                                         }
324                                 }
325                                 if err = os.Remove(job.Fd.Name()); err != nil {
326                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
327                                         isBad = true
328                                 }
329                         }
330                 default:
331                         ctx.LogE("rx", sds, "unknown type")
332                         isBad = true
333                 }
334         Closing:
335                 pipeR.Close()
336         }
337         return isBad
338 }