]> Cypherpunks.ru repositories - nncp.git/blob - src/toss.go
Merge branch 'develop'
[nncp.git] / src / toss.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2020 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bufio"
22         "bytes"
23         "encoding/base64"
24         "errors"
25         "fmt"
26         "io"
27         "io/ioutil"
28         "log"
29         "mime"
30         "os"
31         "os/exec"
32         "path"
33         "path/filepath"
34         "strconv"
35         "strings"
36
37         xdr "github.com/davecgh/go-xdr/xdr2"
38         "github.com/dustin/go-humanize"
39         "github.com/klauspost/compress/zstd"
40         "golang.org/x/crypto/blake2b"
41         "golang.org/x/crypto/poly1305"
42 )
43
44 const (
45         SeenSuffix = ".seen"
46 )
47
48 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
49         lines := []string{
50                 "From: " + fromTo.From,
51                 "To: " + fromTo.To,
52                 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
53         }
54         if len(body) > 0 {
55                 lines = append(lines, []string{
56                         "MIME-Version: 1.0",
57                         "Content-Type: text/plain; charset=utf-8",
58                         "Content-Transfer-Encoding: base64",
59                         "",
60                         base64.StdEncoding.EncodeToString(body),
61                 }...)
62         }
63         return strings.NewReader(strings.Join(lines, "\n"))
64 }
65
66 func (ctx *Ctx) Toss(
67         nodeId *NodeId,
68         nice uint8,
69         dryRun, doSeen, noFile, noFreq, noExec, noTrns bool,
70 ) bool {
71         dirLock, err := ctx.LockDir(nodeId, "toss")
72         if err != nil {
73                 ctx.LogE("rx", SDS{}, err, "lock")
74                 return false
75         }
76         defer ctx.UnlockDir(dirLock)
77         isBad := false
78         sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
79         decompressor, err := zstd.NewReader(nil)
80         if err != nil {
81                 panic(err)
82         }
83         defer decompressor.Close()
84         for job := range ctx.Jobs(nodeId, TRx) {
85                 pktName := filepath.Base(job.Fd.Name())
86                 sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName}
87                 if job.PktEnc.Nice > nice {
88                         ctx.LogD("rx", SdsAdd(sds, SDS{"nice": int(job.PktEnc.Nice)}), "too nice")
89                         continue
90                 }
91                 pipeR, pipeW := io.Pipe()
92                 errs := make(chan error, 1)
93                 go func(job Job) {
94                         pipeWB := bufio.NewWriter(pipeW)
95                         _, _, err := PktEncRead(
96                                 ctx.Self,
97                                 ctx.Neigh,
98                                 bufio.NewReader(job.Fd),
99                                 pipeWB,
100                         )
101                         errs <- err
102                         pipeWB.Flush()
103                         pipeW.Close()
104                         job.Fd.Close()
105                         if err != nil {
106                                 ctx.LogE("rx", sds, err, "decryption")
107                         }
108                 }(job)
109                 var pkt Pkt
110                 var err error
111                 var pktSize int64
112                 var pktSizeBlocks int64
113                 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
114                         ctx.LogE("rx", sds, err, "unmarshal")
115                         isBad = true
116                         goto Closing
117                 }
118                 pktSize = job.Size - PktEncOverhead - PktOverhead - PktSizeOverhead
119                 pktSizeBlocks = pktSize / (EncBlkSize + poly1305.TagSize)
120                 if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
121                         pktSize -= poly1305.TagSize
122                 }
123                 pktSize -= pktSizeBlocks * poly1305.TagSize
124                 sds["size"] = pktSize
125                 ctx.LogD("rx", sds, "taken")
126                 switch pkt.Type {
127                 case PktTypeExec:
128                         if noExec {
129                                 goto Closing
130                         }
131                         path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
132                         handle := string(path[0])
133                         args := make([]string, 0, len(path)-1)
134                         for _, p := range path[1:] {
135                                 args = append(args, string(p))
136                         }
137                         argsStr := strings.Join(append([]string{handle}, args...), " ")
138                         sds := SdsAdd(sds, SDS{
139                                 "type": "exec",
140                                 "dst":  argsStr,
141                         })
142                         sender := ctx.Neigh[*job.PktEnc.Sender]
143                         cmdline, exists := sender.Exec[handle]
144                         if !exists || len(cmdline) == 0 {
145                                 ctx.LogE("rx", sds, errors.New("No handle found"), "")
146                                 isBad = true
147                                 goto Closing
148                         }
149                         if err = decompressor.Reset(pipeR); err != nil {
150                                 log.Fatalln(err)
151                         }
152                         if !dryRun {
153                                 cmd := exec.Command(
154                                         cmdline[0],
155                                         append(cmdline[1:len(cmdline)], args...)...,
156                                 )
157                                 cmd.Env = append(
158                                         cmd.Env,
159                                         "NNCP_SELF="+ctx.Self.Id.String(),
160                                         "NNCP_SENDER="+sender.Id.String(),
161                                         "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
162                                 )
163                                 cmd.Stdin = decompressor
164                                 output, err := cmd.Output()
165                                 if err != nil {
166                                         ctx.LogE("rx", sds, err, "handle")
167                                         isBad = true
168                                         goto Closing
169                                 }
170                                 if len(sendmail) > 0 && ctx.NotifyExec != nil {
171                                         notify, exists := ctx.NotifyExec[sender.Name+"."+handle]
172                                         if !exists {
173                                                 notify, exists = ctx.NotifyExec["*."+handle]
174                                         }
175                                         if exists {
176                                                 cmd := exec.Command(
177                                                         sendmail[0],
178                                                         append(sendmail[1:len(sendmail)], notify.To)...,
179                                                 )
180                                                 cmd.Stdin = newNotification(notify, fmt.Sprintf(
181                                                         "Exec from %s: %s", sender.Name, argsStr,
182                                                 ), output)
183                                                 cmd.Run()
184                                         }
185                                 }
186                         }
187                         ctx.LogI("rx", sds, "")
188                         if !dryRun {
189                                 if doSeen {
190                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
191                                                 fd.Close()
192                                         }
193                                 }
194                                 if err = os.Remove(job.Fd.Name()); err != nil {
195                                         ctx.LogE("rx", sds, err, "remove")
196                                         isBad = true
197                                 }
198                         }
199                 case PktTypeFile:
200                         if noFile {
201                                 goto Closing
202                         }
203                         dst := string(pkt.Path[:int(pkt.PathLen)])
204                         sds := SdsAdd(sds, SDS{"type": "file", "dst": dst})
205                         if filepath.IsAbs(dst) {
206                                 ctx.LogE("rx", sds, errors.New("non-relative destination path"), "")
207                                 isBad = true
208                                 goto Closing
209                         }
210                         incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
211                         if incoming == nil {
212                                 ctx.LogE("rx", sds, errors.New("incoming is not allowed"), "")
213                                 isBad = true
214                                 goto Closing
215                         }
216                         dir := filepath.Join(*incoming, path.Dir(dst))
217                         if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
218                                 ctx.LogE("rx", sds, err, "mkdir")
219                                 isBad = true
220                                 goto Closing
221                         }
222                         if !dryRun {
223                                 tmp, err := TempFile(dir, "file")
224                                 if err != nil {
225                                         ctx.LogE("rx", sds, err, "mktemp")
226                                         isBad = true
227                                         goto Closing
228                                 }
229                                 sds["tmp"] = tmp.Name()
230                                 ctx.LogD("rx", sds, "created")
231                                 bufW := bufio.NewWriter(tmp)
232                                 if _, err = CopyProgressed(
233                                         bufW, pipeR, "Rx file",
234                                         SdsAdd(sds, SDS{"fullsize": sds["size"]}),
235                                         ctx.ShowPrgrs,
236                                 ); err != nil {
237                                         ctx.LogE("rx", sds, err, "copy")
238                                         isBad = true
239                                         goto Closing
240                                 }
241                                 if err = bufW.Flush(); err != nil {
242                                         tmp.Close()
243                                         ctx.LogE("rx", sds, err, "copy")
244                                         isBad = true
245                                         goto Closing
246                                 }
247                                 if err = tmp.Sync(); err != nil {
248                                         tmp.Close()
249                                         ctx.LogE("rx", sds, err, "copy")
250                                         isBad = true
251                                         goto Closing
252                                 }
253                                 tmp.Close()
254                                 dstPathOrig := filepath.Join(*incoming, dst)
255                                 dstPath := dstPathOrig
256                                 dstPathCtr := 0
257                                 for {
258                                         if _, err = os.Stat(dstPath); err != nil {
259                                                 if os.IsNotExist(err) {
260                                                         break
261                                                 }
262                                                 ctx.LogE("rx", sds, err, "stat")
263                                                 isBad = true
264                                                 goto Closing
265                                         }
266                                         dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
267                                         dstPathCtr++
268                                 }
269                                 if err = os.Rename(tmp.Name(), dstPath); err != nil {
270                                         ctx.LogE("rx", sds, err, "rename")
271                                         isBad = true
272                                 }
273                                 if err = DirSync(*incoming); err != nil {
274                                         ctx.LogE("rx", sds, err, "sync")
275                                         isBad = true
276                                 }
277                                 delete(sds, "tmp")
278                         }
279                         ctx.LogI("rx", sds, "")
280                         if !dryRun {
281                                 if doSeen {
282                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
283                                                 fd.Close()
284                                         }
285                                 }
286                                 if err = os.Remove(job.Fd.Name()); err != nil {
287                                         ctx.LogE("rx", sds, err, "remove")
288                                         isBad = true
289                                 }
290                                 if len(sendmail) > 0 && ctx.NotifyFile != nil {
291                                         cmd := exec.Command(
292                                                 sendmail[0],
293                                                 append(sendmail[1:len(sendmail)], ctx.NotifyFile.To)...,
294                                         )
295                                         cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
296                                                 "File from %s: %s (%s)",
297                                                 ctx.Neigh[*job.PktEnc.Sender].Name,
298                                                 dst,
299                                                 humanize.IBytes(uint64(pktSize)),
300                                         ), nil)
301                                         cmd.Run()
302                                 }
303                         }
304                 case PktTypeFreq:
305                         if noFreq {
306                                 goto Closing
307                         }
308                         src := string(pkt.Path[:int(pkt.PathLen)])
309                         if filepath.IsAbs(src) {
310                                 ctx.LogE("rx", sds, errors.New("non-relative source path"), "")
311                                 isBad = true
312                                 goto Closing
313                         }
314                         sds := SdsAdd(sds, SDS{"type": "freq", "src": src})
315                         dstRaw, err := ioutil.ReadAll(pipeR)
316                         if err != nil {
317                                 ctx.LogE("rx", sds, err, "read")
318                                 isBad = true
319                                 goto Closing
320                         }
321                         dst := string(dstRaw)
322                         sds["dst"] = dst
323                         sender := ctx.Neigh[*job.PktEnc.Sender]
324                         freqPath := sender.FreqPath
325                         if freqPath == nil {
326                                 ctx.LogE("rx", sds, errors.New("freqing is not allowed"), "")
327                                 isBad = true
328                                 goto Closing
329                         }
330                         if !dryRun {
331                                 err = ctx.TxFile(
332                                         sender,
333                                         pkt.Nice,
334                                         filepath.Join(*freqPath, src),
335                                         dst,
336                                         sender.FreqChunked,
337                                         sender.FreqMinSize,
338                                         sender.FreqMaxSize,
339                                 )
340                                 if err != nil {
341                                         ctx.LogE("rx", sds, err, "tx file")
342                                         isBad = true
343                                         goto Closing
344                                 }
345                         }
346                         ctx.LogI("rx", sds, "")
347                         if !dryRun {
348                                 if doSeen {
349                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
350                                                 fd.Close()
351                                         }
352                                 }
353                                 if err = os.Remove(job.Fd.Name()); err != nil {
354                                         ctx.LogE("rx", sds, err, "remove")
355                                         isBad = true
356                                 }
357                                 if len(sendmail) > 0 && ctx.NotifyFreq != nil {
358                                         cmd := exec.Command(
359                                                 sendmail[0],
360                                                 append(sendmail[1:len(sendmail)], ctx.NotifyFreq.To)...,
361                                         )
362                                         cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
363                                                 "Freq from %s: %s", sender.Name, src,
364                                         ), nil)
365                                         cmd.Run()
366                                 }
367                         }
368                 case PktTypeTrns:
369                         if noTrns {
370                                 goto Closing
371                         }
372                         dst := new([blake2b.Size256]byte)
373                         copy(dst[:], pkt.Path[:int(pkt.PathLen)])
374                         nodeId := NodeId(*dst)
375                         node, known := ctx.Neigh[nodeId]
376                         sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId})
377                         if !known {
378                                 ctx.LogE("rx", sds, errors.New("unknown node"), "")
379                                 isBad = true
380                                 goto Closing
381                         }
382                         ctx.LogD("rx", sds, "taken")
383                         if !dryRun {
384                                 if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
385                                         ctx.LogE("rx", sds, err, "tx trns")
386                                         isBad = true
387                                         goto Closing
388                                 }
389                         }
390                         ctx.LogI("rx", sds, "")
391                         if !dryRun {
392                                 if doSeen {
393                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
394                                                 fd.Close()
395                                         }
396                                 }
397                                 if err = os.Remove(job.Fd.Name()); err != nil {
398                                         ctx.LogE("rx", sds, err, "remove")
399                                         isBad = true
400                                 }
401                         }
402                 default:
403                         ctx.LogE("rx", sds, errors.New("unknown type"), "")
404                         isBad = true
405                 }
406         Closing:
407                 pipeR.Close()
408         }
409         return isBad
410 }