]> Cypherpunks.ru repositories - nncp.git/blob - src/toss.go
AutoToss every second, not after call is finished
[nncp.git] / src / toss.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bufio"
22         "bytes"
23         "encoding/base64"
24         "errors"
25         "fmt"
26         "io"
27         "io/ioutil"
28         "log"
29         "mime"
30         "os"
31         "os/exec"
32         "path"
33         "path/filepath"
34         "strconv"
35         "strings"
36         "time"
37
38         xdr "github.com/davecgh/go-xdr/xdr2"
39         "github.com/dustin/go-humanize"
40         "github.com/klauspost/compress/zstd"
41         "golang.org/x/crypto/blake2b"
42         "golang.org/x/crypto/poly1305"
43 )
44
45 const (
46         SeenSuffix = ".seen"
47 )
48
49 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
50         lines := []string{
51                 "From: " + fromTo.From,
52                 "To: " + fromTo.To,
53                 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
54         }
55         if len(body) > 0 {
56                 lines = append(lines, []string{
57                         "MIME-Version: 1.0",
58                         "Content-Type: text/plain; charset=utf-8",
59                         "Content-Transfer-Encoding: base64",
60                         "",
61                         base64.StdEncoding.EncodeToString(body),
62                 }...)
63         }
64         return strings.NewReader(strings.Join(lines, "\n"))
65 }
66
67 func (ctx *Ctx) Toss(
68         nodeId *NodeId,
69         nice uint8,
70         dryRun, doSeen, noFile, noFreq, noExec, noTrns bool,
71 ) bool {
72         dirLock, err := ctx.LockDir(nodeId, "toss")
73         if err != nil {
74                 ctx.LogE("rx", SDS{}, err, "lock")
75                 return false
76         }
77         defer ctx.UnlockDir(dirLock)
78         isBad := false
79         sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
80         decompressor, err := zstd.NewReader(nil)
81         if err != nil {
82                 panic(err)
83         }
84         defer decompressor.Close()
85         for job := range ctx.Jobs(nodeId, TRx) {
86                 pktName := filepath.Base(job.Fd.Name())
87                 sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName}
88                 if job.PktEnc.Nice > nice {
89                         ctx.LogD("rx", SdsAdd(sds, SDS{"nice": int(job.PktEnc.Nice)}), "too nice")
90                         continue
91                 }
92                 pipeR, pipeW := io.Pipe()
93                 go func(job Job) error {
94                         pipeWB := bufio.NewWriter(pipeW)
95                         _, _, err := PktEncRead(
96                                 ctx.Self,
97                                 ctx.Neigh,
98                                 bufio.NewReader(job.Fd),
99                                 pipeWB,
100                         )
101                         job.Fd.Close() // #nosec G104
102                         if err != nil {
103                                 ctx.LogE("rx", sds, err, "decryption")
104                                 return pipeW.CloseWithError(err)
105                         }
106                         if err = pipeWB.Flush(); err != nil {
107                                 ctx.LogE("rx", sds, err, "decryption flush")
108                                 return pipeW.CloseWithError(err)
109                         }
110                         return pipeW.Close()
111                 }(job)
112                 var pkt Pkt
113                 var err error
114                 var pktSize int64
115                 var pktSizeBlocks int64
116                 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
117                         ctx.LogE("rx", sds, err, "unmarshal")
118                         isBad = true
119                         goto Closing
120                 }
121                 pktSize = job.Size - PktEncOverhead - PktOverhead - PktSizeOverhead
122                 pktSizeBlocks = pktSize / (EncBlkSize + poly1305.TagSize)
123                 if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
124                         pktSize -= poly1305.TagSize
125                 }
126                 pktSize -= pktSizeBlocks * poly1305.TagSize
127                 sds["size"] = pktSize
128                 ctx.LogD("rx", sds, "taken")
129                 switch pkt.Type {
130                 case PktTypeExec, PktTypeExecFat:
131                         if noExec {
132                                 goto Closing
133                         }
134                         path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
135                         handle := string(path[0])
136                         args := make([]string, 0, len(path)-1)
137                         for _, p := range path[1:] {
138                                 args = append(args, string(p))
139                         }
140                         argsStr := strings.Join(append([]string{handle}, args...), " ")
141                         sds := SdsAdd(sds, SDS{
142                                 "type": "exec",
143                                 "dst":  argsStr,
144                         })
145                         sender := ctx.Neigh[*job.PktEnc.Sender]
146                         cmdline, exists := sender.Exec[handle]
147                         if !exists || len(cmdline) == 0 {
148                                 ctx.LogE("rx", sds, errors.New("No handle found"), "")
149                                 isBad = true
150                                 goto Closing
151                         }
152                         if pkt.Type == PktTypeExec {
153                                 if err = decompressor.Reset(pipeR); err != nil {
154                                         log.Fatalln(err)
155                                 }
156                         }
157                         if !dryRun {
158                                 cmd := exec.Command(
159                                         cmdline[0],
160                                         append(cmdline[1:len(cmdline)], args...)...,
161                                 )
162                                 cmd.Env = append(
163                                         cmd.Env,
164                                         "NNCP_SELF="+ctx.Self.Id.String(),
165                                         "NNCP_SENDER="+sender.Id.String(),
166                                         "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
167                                 )
168                                 if pkt.Type == PktTypeExec {
169                                         cmd.Stdin = decompressor
170                                 } else {
171                                         cmd.Stdin = pipeR
172                                 }
173                                 output, err := cmd.Output()
174                                 if err != nil {
175                                         ctx.LogE("rx", sds, err, "handle")
176                                         isBad = true
177                                         goto Closing
178                                 }
179                                 if len(sendmail) > 0 && ctx.NotifyExec != nil {
180                                         notify, exists := ctx.NotifyExec[sender.Name+"."+handle]
181                                         if !exists {
182                                                 notify, exists = ctx.NotifyExec["*."+handle]
183                                         }
184                                         if exists {
185                                                 cmd := exec.Command(
186                                                         sendmail[0],
187                                                         append(sendmail[1:len(sendmail)], notify.To)...,
188                                                 )
189                                                 cmd.Stdin = newNotification(notify, fmt.Sprintf(
190                                                         "Exec from %s: %s", sender.Name, argsStr,
191                                                 ), output)
192                                                 if err = cmd.Run(); err != nil {
193                                                         ctx.LogE("rx", sds, err, "notify")
194                                                 }
195                                         }
196                                 }
197                         }
198                         ctx.LogI("rx", sds, "")
199                         if !dryRun {
200                                 if doSeen {
201                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
202                                                 fd.Close() // #nosec G104
203                                         }
204                                 }
205                                 if err = os.Remove(job.Fd.Name()); err != nil {
206                                         ctx.LogE("rx", sds, err, "remove")
207                                         isBad = true
208                                 }
209                         }
210                 case PktTypeFile:
211                         if noFile {
212                                 goto Closing
213                         }
214                         dst := string(pkt.Path[:int(pkt.PathLen)])
215                         sds := SdsAdd(sds, SDS{"type": "file", "dst": dst})
216                         if filepath.IsAbs(dst) {
217                                 ctx.LogE("rx", sds, errors.New("non-relative destination path"), "")
218                                 isBad = true
219                                 goto Closing
220                         }
221                         incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
222                         if incoming == nil {
223                                 ctx.LogE("rx", sds, errors.New("incoming is not allowed"), "")
224                                 isBad = true
225                                 goto Closing
226                         }
227                         dir := filepath.Join(*incoming, path.Dir(dst))
228                         if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
229                                 ctx.LogE("rx", sds, err, "mkdir")
230                                 isBad = true
231                                 goto Closing
232                         }
233                         if !dryRun {
234                                 tmp, err := TempFile(dir, "file")
235                                 if err != nil {
236                                         ctx.LogE("rx", sds, err, "mktemp")
237                                         isBad = true
238                                         goto Closing
239                                 }
240                                 sds["tmp"] = tmp.Name()
241                                 ctx.LogD("rx", sds, "created")
242                                 bufW := bufio.NewWriter(tmp)
243                                 if _, err = CopyProgressed(
244                                         bufW, pipeR, "Rx file",
245                                         SdsAdd(sds, SDS{"fullsize": sds["size"]}),
246                                         ctx.ShowPrgrs,
247                                 ); err != nil {
248                                         ctx.LogE("rx", sds, err, "copy")
249                                         isBad = true
250                                         goto Closing
251                                 }
252                                 if err = bufW.Flush(); err != nil {
253                                         tmp.Close() // #nosec G104
254                                         ctx.LogE("rx", sds, err, "copy")
255                                         isBad = true
256                                         goto Closing
257                                 }
258                                 if err = tmp.Sync(); err != nil {
259                                         tmp.Close() // #nosec G104
260                                         ctx.LogE("rx", sds, err, "copy")
261                                         isBad = true
262                                         goto Closing
263                                 }
264                                 if err = tmp.Close(); err != nil {
265                                         ctx.LogE("rx", sds, err, "copy")
266                                         isBad = true
267                                         goto Closing
268                                 }
269                                 dstPathOrig := filepath.Join(*incoming, dst)
270                                 dstPath := dstPathOrig
271                                 dstPathCtr := 0
272                                 for {
273                                         if _, err = os.Stat(dstPath); err != nil {
274                                                 if os.IsNotExist(err) {
275                                                         break
276                                                 }
277                                                 ctx.LogE("rx", sds, err, "stat")
278                                                 isBad = true
279                                                 goto Closing
280                                         }
281                                         dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
282                                         dstPathCtr++
283                                 }
284                                 if err = os.Rename(tmp.Name(), dstPath); err != nil {
285                                         ctx.LogE("rx", sds, err, "rename")
286                                         isBad = true
287                                 }
288                                 if err = DirSync(*incoming); err != nil {
289                                         ctx.LogE("rx", sds, err, "sync")
290                                         isBad = true
291                                 }
292                                 delete(sds, "tmp")
293                         }
294                         ctx.LogI("rx", sds, "")
295                         if !dryRun {
296                                 if doSeen {
297                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
298                                                 fd.Close() // #nosec G104
299                                         }
300                                 }
301                                 if err = os.Remove(job.Fd.Name()); err != nil {
302                                         ctx.LogE("rx", sds, err, "remove")
303                                         isBad = true
304                                 }
305                                 if len(sendmail) > 0 && ctx.NotifyFile != nil {
306                                         cmd := exec.Command(
307                                                 sendmail[0],
308                                                 append(sendmail[1:len(sendmail)], ctx.NotifyFile.To)...,
309                                         )
310                                         cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
311                                                 "File from %s: %s (%s)",
312                                                 ctx.Neigh[*job.PktEnc.Sender].Name,
313                                                 dst,
314                                                 humanize.IBytes(uint64(pktSize)),
315                                         ), nil)
316                                         if err = cmd.Run(); err != nil {
317                                                 ctx.LogE("rx", sds, err, "notify")
318                                         }
319                                 }
320                         }
321                 case PktTypeFreq:
322                         if noFreq {
323                                 goto Closing
324                         }
325                         src := string(pkt.Path[:int(pkt.PathLen)])
326                         if filepath.IsAbs(src) {
327                                 ctx.LogE("rx", sds, errors.New("non-relative source path"), "")
328                                 isBad = true
329                                 goto Closing
330                         }
331                         sds := SdsAdd(sds, SDS{"type": "freq", "src": src})
332                         dstRaw, err := ioutil.ReadAll(pipeR)
333                         if err != nil {
334                                 ctx.LogE("rx", sds, err, "read")
335                                 isBad = true
336                                 goto Closing
337                         }
338                         dst := string(dstRaw)
339                         sds["dst"] = dst
340                         sender := ctx.Neigh[*job.PktEnc.Sender]
341                         freqPath := sender.FreqPath
342                         if freqPath == nil {
343                                 ctx.LogE("rx", sds, errors.New("freqing is not allowed"), "")
344                                 isBad = true
345                                 goto Closing
346                         }
347                         if !dryRun {
348                                 err = ctx.TxFile(
349                                         sender,
350                                         pkt.Nice,
351                                         filepath.Join(*freqPath, src),
352                                         dst,
353                                         sender.FreqChunked,
354                                         sender.FreqMinSize,
355                                         sender.FreqMaxSize,
356                                 )
357                                 if err != nil {
358                                         ctx.LogE("rx", sds, err, "tx file")
359                                         isBad = true
360                                         goto Closing
361                                 }
362                         }
363                         ctx.LogI("rx", sds, "")
364                         if !dryRun {
365                                 if doSeen {
366                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
367                                                 fd.Close() // #nosec G104
368                                         }
369                                 }
370                                 if err = os.Remove(job.Fd.Name()); err != nil {
371                                         ctx.LogE("rx", sds, err, "remove")
372                                         isBad = true
373                                 }
374                                 if len(sendmail) > 0 && ctx.NotifyFreq != nil {
375                                         cmd := exec.Command(
376                                                 sendmail[0],
377                                                 append(sendmail[1:len(sendmail)], ctx.NotifyFreq.To)...,
378                                         )
379                                         cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
380                                                 "Freq from %s: %s", sender.Name, src,
381                                         ), nil)
382                                         if err = cmd.Run(); err != nil {
383                                                 ctx.LogE("rx", sds, err, "notify")
384                                         }
385                                 }
386                         }
387                 case PktTypeTrns:
388                         if noTrns {
389                                 goto Closing
390                         }
391                         dst := new([blake2b.Size256]byte)
392                         copy(dst[:], pkt.Path[:int(pkt.PathLen)])
393                         nodeId := NodeId(*dst)
394                         node, known := ctx.Neigh[nodeId]
395                         sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId})
396                         if !known {
397                                 ctx.LogE("rx", sds, errors.New("unknown node"), "")
398                                 isBad = true
399                                 goto Closing
400                         }
401                         ctx.LogD("rx", sds, "taken")
402                         if !dryRun {
403                                 if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
404                                         ctx.LogE("rx", sds, err, "tx trns")
405                                         isBad = true
406                                         goto Closing
407                                 }
408                         }
409                         ctx.LogI("rx", sds, "")
410                         if !dryRun {
411                                 if doSeen {
412                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
413                                                 fd.Close() // #nosec G104
414                                         }
415                                 }
416                                 if err = os.Remove(job.Fd.Name()); err != nil {
417                                         ctx.LogE("rx", sds, err, "remove")
418                                         isBad = true
419                                 }
420                         }
421                 default:
422                         ctx.LogE("rx", sds, errors.New("unknown type"), "")
423                         isBad = true
424                 }
425         Closing:
426                 pipeR.Close() // #nosec G104
427         }
428         return isBad
429 }
430
431 func (ctx *Ctx) AutoToss(
432         nodeId *NodeId,
433         nice uint8,
434         doSeen, noFile, noFreq, noExec, noTrns bool,
435 ) (chan struct{}, chan bool) {
436         finish := make(chan struct{})
437         badCode := make(chan bool)
438         go func() {
439                 bad := false
440                 for {
441                         select {
442                         case <-finish:
443                                 badCode <- bad
444                                 break
445                         default:
446                         }
447                         time.Sleep(time.Second)
448                         bad = !ctx.Toss(nodeId, nice, false, doSeen, noFile, noFreq, noExec, noTrns)
449                 }
450         }()
451         return finish, badCode
452 }