]> Cypherpunks.ru repositories - nncp.git/blob - src/toss.go
Merge branch 'develop'
[nncp.git] / src / toss.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bufio"
22         "bytes"
23         "encoding/base64"
24         "errors"
25         "fmt"
26         "io"
27         "io/ioutil"
28         "log"
29         "mime"
30         "os"
31         "os/exec"
32         "path"
33         "path/filepath"
34         "strconv"
35         "strings"
36         "time"
37
38         xdr "github.com/davecgh/go-xdr/xdr2"
39         "github.com/dustin/go-humanize"
40         "github.com/klauspost/compress/zstd"
41         "golang.org/x/crypto/blake2b"
42         "golang.org/x/crypto/poly1305"
43 )
44
45 const (
46         SeenSuffix = ".seen"
47 )
48
49 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
50         lines := []string{
51                 "From: " + fromTo.From,
52                 "To: " + fromTo.To,
53                 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
54         }
55         if len(body) > 0 {
56                 lines = append(lines, []string{
57                         "MIME-Version: 1.0",
58                         "Content-Type: text/plain; charset=utf-8",
59                         "Content-Transfer-Encoding: base64",
60                         "",
61                         base64.StdEncoding.EncodeToString(body),
62                 }...)
63         }
64         return strings.NewReader(strings.Join(lines, "\n"))
65 }
66
67 func (ctx *Ctx) Toss(
68         nodeId *NodeId,
69         nice uint8,
70         dryRun, doSeen, noFile, noFreq, noExec, noTrns bool,
71 ) bool {
72         dirLock, err := ctx.LockDir(nodeId, "toss")
73         if err != nil {
74                 ctx.LogE("rx", SDS{}, err, "lock")
75                 return false
76         }
77         defer ctx.UnlockDir(dirLock)
78         isBad := false
79         sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
80         decompressor, err := zstd.NewReader(nil)
81         if err != nil {
82                 panic(err)
83         }
84         defer decompressor.Close()
85         for job := range ctx.Jobs(nodeId, TRx) {
86                 pktName := filepath.Base(job.Fd.Name())
87                 sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName}
88                 if job.PktEnc.Nice > nice {
89                         ctx.LogD("rx", SdsAdd(sds, SDS{"nice": int(job.PktEnc.Nice)}), "too nice")
90                         continue
91                 }
92                 pipeR, pipeW := io.Pipe()
93                 go func(job Job) error {
94                         pipeWB := bufio.NewWriter(pipeW)
95                         _, _, err := PktEncRead(
96                                 ctx.Self,
97                                 ctx.Neigh,
98                                 bufio.NewReader(job.Fd),
99                                 pipeWB,
100                         )
101                         job.Fd.Close() // #nosec G104
102                         if err != nil {
103                                 return pipeW.CloseWithError(err)
104                         }
105                         if err = pipeWB.Flush(); err != nil {
106                                 return pipeW.CloseWithError(err)
107                         }
108                         return pipeW.Close()
109                 }(job)
110                 var pkt Pkt
111                 var err error
112                 var pktSize int64
113                 var pktSizeBlocks int64
114                 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
115                         ctx.LogE("rx", sds, err, "unmarshal")
116                         isBad = true
117                         goto Closing
118                 }
119                 pktSize = job.Size - PktEncOverhead - PktOverhead - PktSizeOverhead
120                 pktSizeBlocks = pktSize / (EncBlkSize + poly1305.TagSize)
121                 if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
122                         pktSize -= poly1305.TagSize
123                 }
124                 pktSize -= pktSizeBlocks * poly1305.TagSize
125                 sds["size"] = pktSize
126                 ctx.LogD("rx", sds, "taken")
127                 switch pkt.Type {
128                 case PktTypeExec, PktTypeExecFat:
129                         if noExec {
130                                 goto Closing
131                         }
132                         path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
133                         handle := string(path[0])
134                         args := make([]string, 0, len(path)-1)
135                         for _, p := range path[1:] {
136                                 args = append(args, string(p))
137                         }
138                         argsStr := strings.Join(append([]string{handle}, args...), " ")
139                         sds := SdsAdd(sds, SDS{
140                                 "type": "exec",
141                                 "dst":  argsStr,
142                         })
143                         sender := ctx.Neigh[*job.PktEnc.Sender]
144                         cmdline, exists := sender.Exec[handle]
145                         if !exists || len(cmdline) == 0 {
146                                 ctx.LogE("rx", sds, errors.New("No handle found"), "")
147                                 isBad = true
148                                 goto Closing
149                         }
150                         if pkt.Type == PktTypeExec {
151                                 if err = decompressor.Reset(pipeR); err != nil {
152                                         log.Fatalln(err)
153                                 }
154                         }
155                         if !dryRun {
156                                 cmd := exec.Command(
157                                         cmdline[0],
158                                         append(cmdline[1:], args...)...,
159                                 )
160                                 cmd.Env = append(
161                                         cmd.Env,
162                                         "NNCP_SELF="+ctx.Self.Id.String(),
163                                         "NNCP_SENDER="+sender.Id.String(),
164                                         "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
165                                 )
166                                 if pkt.Type == PktTypeExec {
167                                         cmd.Stdin = decompressor
168                                 } else {
169                                         cmd.Stdin = pipeR
170                                 }
171                                 output, err := cmd.Output()
172                                 if err != nil {
173                                         ctx.LogE("rx", sds, err, "handle")
174                                         isBad = true
175                                         goto Closing
176                                 }
177                                 if len(sendmail) > 0 && ctx.NotifyExec != nil {
178                                         notify, exists := ctx.NotifyExec[sender.Name+"."+handle]
179                                         if !exists {
180                                                 notify, exists = ctx.NotifyExec["*."+handle]
181                                         }
182                                         if exists {
183                                                 cmd := exec.Command(
184                                                         sendmail[0],
185                                                         append(sendmail[1:], notify.To)...,
186                                                 )
187                                                 cmd.Stdin = newNotification(notify, fmt.Sprintf(
188                                                         "Exec from %s: %s", sender.Name, argsStr,
189                                                 ), output)
190                                                 if err = cmd.Run(); err != nil {
191                                                         ctx.LogE("rx", sds, err, "notify")
192                                                 }
193                                         }
194                                 }
195                         }
196                         ctx.LogI("rx", sds, "")
197                         if !dryRun {
198                                 if doSeen {
199                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
200                                                 fd.Close() // #nosec G104
201                                         }
202                                 }
203                                 if err = os.Remove(job.Fd.Name()); err != nil {
204                                         ctx.LogE("rx", sds, err, "remove")
205                                         isBad = true
206                                 }
207                         }
208                 case PktTypeFile:
209                         if noFile {
210                                 goto Closing
211                         }
212                         dst := string(pkt.Path[:int(pkt.PathLen)])
213                         sds := SdsAdd(sds, SDS{"type": "file", "dst": dst})
214                         if filepath.IsAbs(dst) {
215                                 ctx.LogE("rx", sds, errors.New("non-relative destination path"), "")
216                                 isBad = true
217                                 goto Closing
218                         }
219                         incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
220                         if incoming == nil {
221                                 ctx.LogE("rx", sds, errors.New("incoming is not allowed"), "")
222                                 isBad = true
223                                 goto Closing
224                         }
225                         dir := filepath.Join(*incoming, path.Dir(dst))
226                         if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
227                                 ctx.LogE("rx", sds, err, "mkdir")
228                                 isBad = true
229                                 goto Closing
230                         }
231                         if !dryRun {
232                                 tmp, err := TempFile(dir, "file")
233                                 if err != nil {
234                                         ctx.LogE("rx", sds, err, "mktemp")
235                                         isBad = true
236                                         goto Closing
237                                 }
238                                 sds["tmp"] = tmp.Name()
239                                 ctx.LogD("rx", sds, "created")
240                                 bufW := bufio.NewWriter(tmp)
241                                 if _, err = CopyProgressed(
242                                         bufW, pipeR, "Rx file",
243                                         SdsAdd(sds, SDS{"fullsize": sds["size"]}),
244                                         ctx.ShowPrgrs,
245                                 ); err != nil {
246                                         ctx.LogE("rx", sds, err, "copy")
247                                         isBad = true
248                                         goto Closing
249                                 }
250                                 if err = bufW.Flush(); err != nil {
251                                         tmp.Close() // #nosec G104
252                                         ctx.LogE("rx", sds, err, "copy")
253                                         isBad = true
254                                         goto Closing
255                                 }
256                                 if err = tmp.Sync(); err != nil {
257                                         tmp.Close() // #nosec G104
258                                         ctx.LogE("rx", sds, err, "copy")
259                                         isBad = true
260                                         goto Closing
261                                 }
262                                 if err = tmp.Close(); err != nil {
263                                         ctx.LogE("rx", sds, err, "copy")
264                                         isBad = true
265                                         goto Closing
266                                 }
267                                 dstPathOrig := filepath.Join(*incoming, dst)
268                                 dstPath := dstPathOrig
269                                 dstPathCtr := 0
270                                 for {
271                                         if _, err = os.Stat(dstPath); err != nil {
272                                                 if os.IsNotExist(err) {
273                                                         break
274                                                 }
275                                                 ctx.LogE("rx", sds, err, "stat")
276                                                 isBad = true
277                                                 goto Closing
278                                         }
279                                         dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
280                                         dstPathCtr++
281                                 }
282                                 if err = os.Rename(tmp.Name(), dstPath); err != nil {
283                                         ctx.LogE("rx", sds, err, "rename")
284                                         isBad = true
285                                 }
286                                 if err = DirSync(*incoming); err != nil {
287                                         ctx.LogE("rx", sds, err, "sync")
288                                         isBad = true
289                                 }
290                                 delete(sds, "tmp")
291                         }
292                         ctx.LogI("rx", sds, "")
293                         if !dryRun {
294                                 if doSeen {
295                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
296                                                 fd.Close() // #nosec G104
297                                         }
298                                 }
299                                 if err = os.Remove(job.Fd.Name()); err != nil {
300                                         ctx.LogE("rx", sds, err, "remove")
301                                         isBad = true
302                                 }
303                                 if len(sendmail) > 0 && ctx.NotifyFile != nil {
304                                         cmd := exec.Command(
305                                                 sendmail[0],
306                                                 append(sendmail[1:], ctx.NotifyFile.To)...,
307                                         )
308                                         cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
309                                                 "File from %s: %s (%s)",
310                                                 ctx.Neigh[*job.PktEnc.Sender].Name,
311                                                 dst,
312                                                 humanize.IBytes(uint64(pktSize)),
313                                         ), nil)
314                                         if err = cmd.Run(); err != nil {
315                                                 ctx.LogE("rx", sds, err, "notify")
316                                         }
317                                 }
318                         }
319                 case PktTypeFreq:
320                         if noFreq {
321                                 goto Closing
322                         }
323                         src := string(pkt.Path[:int(pkt.PathLen)])
324                         if filepath.IsAbs(src) {
325                                 ctx.LogE("rx", sds, errors.New("non-relative source path"), "")
326                                 isBad = true
327                                 goto Closing
328                         }
329                         sds := SdsAdd(sds, SDS{"type": "freq", "src": src})
330                         dstRaw, err := ioutil.ReadAll(pipeR)
331                         if err != nil {
332                                 ctx.LogE("rx", sds, err, "read")
333                                 isBad = true
334                                 goto Closing
335                         }
336                         dst := string(dstRaw)
337                         sds["dst"] = dst
338                         sender := ctx.Neigh[*job.PktEnc.Sender]
339                         freqPath := sender.FreqPath
340                         if freqPath == nil {
341                                 ctx.LogE("rx", sds, errors.New("freqing is not allowed"), "")
342                                 isBad = true
343                                 goto Closing
344                         }
345                         if !dryRun {
346                                 err = ctx.TxFile(
347                                         sender,
348                                         pkt.Nice,
349                                         filepath.Join(*freqPath, src),
350                                         dst,
351                                         sender.FreqChunked,
352                                         sender.FreqMinSize,
353                                         sender.FreqMaxSize,
354                                 )
355                                 if err != nil {
356                                         ctx.LogE("rx", sds, err, "tx file")
357                                         isBad = true
358                                         goto Closing
359                                 }
360                         }
361                         ctx.LogI("rx", sds, "")
362                         if !dryRun {
363                                 if doSeen {
364                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
365                                                 fd.Close() // #nosec G104
366                                         }
367                                 }
368                                 if err = os.Remove(job.Fd.Name()); err != nil {
369                                         ctx.LogE("rx", sds, err, "remove")
370                                         isBad = true
371                                 }
372                                 if len(sendmail) > 0 && ctx.NotifyFreq != nil {
373                                         cmd := exec.Command(
374                                                 sendmail[0],
375                                                 append(sendmail[1:], ctx.NotifyFreq.To)...,
376                                         )
377                                         cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
378                                                 "Freq from %s: %s", sender.Name, src,
379                                         ), nil)
380                                         if err = cmd.Run(); err != nil {
381                                                 ctx.LogE("rx", sds, err, "notify")
382                                         }
383                                 }
384                         }
385                 case PktTypeTrns:
386                         if noTrns {
387                                 goto Closing
388                         }
389                         dst := new([blake2b.Size256]byte)
390                         copy(dst[:], pkt.Path[:int(pkt.PathLen)])
391                         nodeId := NodeId(*dst)
392                         node, known := ctx.Neigh[nodeId]
393                         sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId})
394                         if !known {
395                                 ctx.LogE("rx", sds, errors.New("unknown node"), "")
396                                 isBad = true
397                                 goto Closing
398                         }
399                         ctx.LogD("rx", sds, "taken")
400                         if !dryRun {
401                                 if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
402                                         ctx.LogE("rx", sds, err, "tx trns")
403                                         isBad = true
404                                         goto Closing
405                                 }
406                         }
407                         ctx.LogI("rx", sds, "")
408                         if !dryRun {
409                                 if doSeen {
410                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
411                                                 fd.Close() // #nosec G104
412                                         }
413                                 }
414                                 if err = os.Remove(job.Fd.Name()); err != nil {
415                                         ctx.LogE("rx", sds, err, "remove")
416                                         isBad = true
417                                 }
418                         }
419                 default:
420                         ctx.LogE("rx", sds, errors.New("unknown type"), "")
421                         isBad = true
422                 }
423         Closing:
424                 pipeR.Close() // #nosec G104
425         }
426         return isBad
427 }
428
429 func (ctx *Ctx) AutoToss(
430         nodeId *NodeId,
431         nice uint8,
432         doSeen, noFile, noFreq, noExec, noTrns bool,
433 ) (chan struct{}, chan bool) {
434         finish := make(chan struct{})
435         badCode := make(chan bool)
436         go func() {
437                 bad := false
438                 for {
439                         select {
440                         case <-finish:
441                                 badCode <- bad
442                                 break
443                         default:
444                         }
445                         time.Sleep(time.Second)
446                         bad = !ctx.Toss(nodeId, nice, false, doSeen, noFile, noFreq, noExec, noTrns)
447                 }
448         }()
449         return finish, badCode
450 }