]> Cypherpunks.ru repositories - nncp.git/blob - src/toss.go
Merge branch 'develop'
[nncp.git] / src / toss.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bufio"
22         "bytes"
23         "encoding/base64"
24         "errors"
25         "fmt"
26         "io"
27         "io/ioutil"
28         "log"
29         "mime"
30         "os"
31         "os/exec"
32         "path"
33         "path/filepath"
34         "strconv"
35         "strings"
36         "time"
37
38         xdr "github.com/davecgh/go-xdr/xdr2"
39         "github.com/dustin/go-humanize"
40         "github.com/klauspost/compress/zstd"
41         "golang.org/x/crypto/blake2b"
42         "golang.org/x/crypto/poly1305"
43 )
44
45 const (
46         SeenSuffix = ".seen"
47 )
48
49 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
50         lines := []string{
51                 "From: " + fromTo.From,
52                 "To: " + fromTo.To,
53                 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
54         }
55         if len(body) > 0 {
56                 lines = append(lines, []string{
57                         "MIME-Version: 1.0",
58                         "Content-Type: text/plain; charset=utf-8",
59                         "Content-Transfer-Encoding: base64",
60                         "",
61                         base64.StdEncoding.EncodeToString(body),
62                 }...)
63         }
64         return strings.NewReader(strings.Join(lines, "\n"))
65 }
66
67 func (ctx *Ctx) Toss(
68         nodeId *NodeId,
69         nice uint8,
70         dryRun, doSeen, noFile, noFreq, noExec, noTrns bool,
71 ) bool {
72         dirLock, err := ctx.LockDir(nodeId, "toss")
73         if err != nil {
74                 ctx.LogE("rx", LEs{}, err, "lock")
75                 return false
76         }
77         defer ctx.UnlockDir(dirLock)
78         isBad := false
79         sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
80         decompressor, err := zstd.NewReader(nil)
81         if err != nil {
82                 panic(err)
83         }
84         defer decompressor.Close()
85         for job := range ctx.Jobs(nodeId, TRx) {
86                 pktName := filepath.Base(job.Path)
87                 les := LEs{{"Node", job.PktEnc.Sender}, {"Pkt", pktName}}
88                 if job.PktEnc.Nice > nice {
89                         ctx.LogD("rx", append(les, LE{"Nice", int(job.PktEnc.Nice)}), "too nice")
90                         continue
91                 }
92                 fd, err := os.Open(job.Path)
93                 if err != nil {
94                         ctx.LogE("rx", les, err, "open")
95                         isBad = true
96                         continue
97                 }
98
99                 pipeR, pipeW := io.Pipe()
100                 go func(job Job) error {
101                         pipeWB := bufio.NewWriter(pipeW)
102                         _, _, err := PktEncRead(ctx.Self, ctx.Neigh, bufio.NewReader(fd), pipeWB)
103                         fd.Close() // #nosec G104
104                         if err != nil {
105                                 return pipeW.CloseWithError(err)
106                         }
107                         if err = pipeWB.Flush(); err != nil {
108                                 return pipeW.CloseWithError(err)
109                         }
110                         return pipeW.Close()
111                 }(job)
112                 var pkt Pkt
113                 var pktSize int64
114                 var pktSizeBlocks int64
115                 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
116                         ctx.LogE("rx", les, err, "unmarshal")
117                         isBad = true
118                         goto Closing
119                 }
120                 pktSize = job.Size - PktEncOverhead - PktOverhead - PktSizeOverhead
121                 pktSizeBlocks = pktSize / (EncBlkSize + poly1305.TagSize)
122                 if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
123                         pktSize -= poly1305.TagSize
124                 }
125                 pktSize -= pktSizeBlocks * poly1305.TagSize
126                 les = append(les, LE{"Size", pktSize})
127                 ctx.LogD("rx", les, "taken")
128                 switch pkt.Type {
129                 case PktTypeExec, PktTypeExecFat:
130                         if noExec {
131                                 goto Closing
132                         }
133                         path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
134                         handle := string(path[0])
135                         args := make([]string, 0, len(path)-1)
136                         for _, p := range path[1:] {
137                                 args = append(args, string(p))
138                         }
139                         argsStr := strings.Join(append([]string{handle}, args...), " ")
140                         les = append(les, LEs{
141                                 {"Type", "exec"},
142                                 {"Dst", argsStr},
143                         }...)
144                         sender := ctx.Neigh[*job.PktEnc.Sender]
145                         cmdline, exists := sender.Exec[handle]
146                         if !exists || len(cmdline) == 0 {
147                                 ctx.LogE("rx", les, errors.New("No handle found"), "")
148                                 isBad = true
149                                 goto Closing
150                         }
151                         if pkt.Type == PktTypeExec {
152                                 if err = decompressor.Reset(pipeR); err != nil {
153                                         log.Fatalln(err)
154                                 }
155                         }
156                         if !dryRun {
157                                 cmd := exec.Command(
158                                         cmdline[0],
159                                         append(cmdline[1:], args...)...,
160                                 )
161                                 cmd.Env = append(
162                                         cmd.Env,
163                                         "NNCP_SELF="+ctx.Self.Id.String(),
164                                         "NNCP_SENDER="+sender.Id.String(),
165                                         "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
166                                 )
167                                 if pkt.Type == PktTypeExec {
168                                         cmd.Stdin = decompressor
169                                 } else {
170                                         cmd.Stdin = pipeR
171                                 }
172                                 output, err := cmd.Output()
173                                 if err != nil {
174                                         ctx.LogE("rx", les, err, "handle")
175                                         isBad = true
176                                         goto Closing
177                                 }
178                                 if len(sendmail) > 0 && ctx.NotifyExec != nil {
179                                         notify, exists := ctx.NotifyExec[sender.Name+"."+handle]
180                                         if !exists {
181                                                 notify, exists = ctx.NotifyExec["*."+handle]
182                                         }
183                                         if exists {
184                                                 cmd := exec.Command(
185                                                         sendmail[0],
186                                                         append(sendmail[1:], notify.To)...,
187                                                 )
188                                                 cmd.Stdin = newNotification(notify, fmt.Sprintf(
189                                                         "Exec from %s: %s", sender.Name, argsStr,
190                                                 ), output)
191                                                 if err = cmd.Run(); err != nil {
192                                                         ctx.LogE("rx", les, err, "notify")
193                                                 }
194                                         }
195                                 }
196                         }
197                         ctx.LogI("rx", les, "")
198                         if !dryRun {
199                                 if doSeen {
200                                         if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
201                                                 fd.Close() // #nosec G104
202                                         }
203                                 }
204                                 if err = os.Remove(job.Path); err != nil {
205                                         ctx.LogE("rx", les, err, "remove")
206                                         isBad = true
207                                 } else if ctx.HdrUsage {
208                                         os.Remove(job.Path + HdrSuffix)
209                                 }
210                         }
211                 case PktTypeFile:
212                         if noFile {
213                                 goto Closing
214                         }
215                         dst := string(pkt.Path[:int(pkt.PathLen)])
216                         les = append(les, LEs{{"Type", "file"}, {"Dst", dst}}...)
217                         if filepath.IsAbs(dst) {
218                                 ctx.LogE("rx", les, errors.New("non-relative destination path"), "")
219                                 isBad = true
220                                 goto Closing
221                         }
222                         incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
223                         if incoming == nil {
224                                 ctx.LogE("rx", les, errors.New("incoming is not allowed"), "")
225                                 isBad = true
226                                 goto Closing
227                         }
228                         dir := filepath.Join(*incoming, path.Dir(dst))
229                         if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
230                                 ctx.LogE("rx", les, err, "mkdir")
231                                 isBad = true
232                                 goto Closing
233                         }
234                         if !dryRun {
235                                 tmp, err := TempFile(dir, "file")
236                                 if err != nil {
237                                         ctx.LogE("rx", les, err, "mktemp")
238                                         isBad = true
239                                         goto Closing
240                                 }
241                                 les = append(les, LE{"Tmp", tmp.Name()})
242                                 ctx.LogD("rx", les, "created")
243                                 bufW := bufio.NewWriter(tmp)
244                                 if _, err = CopyProgressed(
245                                         bufW, pipeR, "Rx file",
246                                         append(les, LE{"FullSize", pktSize}),
247                                         ctx.ShowPrgrs,
248                                 ); err != nil {
249                                         ctx.LogE("rx", les, err, "copy")
250                                         isBad = true
251                                         goto Closing
252                                 }
253                                 if err = bufW.Flush(); err != nil {
254                                         tmp.Close() // #nosec G104
255                                         ctx.LogE("rx", les, err, "copy")
256                                         isBad = true
257                                         goto Closing
258                                 }
259                                 if err = tmp.Sync(); err != nil {
260                                         tmp.Close() // #nosec G104
261                                         ctx.LogE("rx", les, err, "copy")
262                                         isBad = true
263                                         goto Closing
264                                 }
265                                 if err = tmp.Close(); err != nil {
266                                         ctx.LogE("rx", les, err, "copy")
267                                         isBad = true
268                                         goto Closing
269                                 }
270                                 dstPathOrig := filepath.Join(*incoming, dst)
271                                 dstPath := dstPathOrig
272                                 dstPathCtr := 0
273                                 for {
274                                         if _, err = os.Stat(dstPath); err != nil {
275                                                 if os.IsNotExist(err) {
276                                                         break
277                                                 }
278                                                 ctx.LogE("rx", les, err, "stat")
279                                                 isBad = true
280                                                 goto Closing
281                                         }
282                                         dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
283                                         dstPathCtr++
284                                 }
285                                 if err = os.Rename(tmp.Name(), dstPath); err != nil {
286                                         ctx.LogE("rx", les, err, "rename")
287                                         isBad = true
288                                 }
289                                 if err = DirSync(*incoming); err != nil {
290                                         ctx.LogE("rx", les, err, "sync")
291                                         isBad = true
292                                 }
293                                 les = les[:len(les)-1] // delete Tmp
294                         }
295                         ctx.LogI("rx", les, "")
296                         if !dryRun {
297                                 if doSeen {
298                                         if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
299                                                 fd.Close() // #nosec G104
300                                         }
301                                 }
302                                 if err = os.Remove(job.Path); err != nil {
303                                         ctx.LogE("rx", les, err, "remove")
304                                         isBad = true
305                                 } else if ctx.HdrUsage {
306                                         os.Remove(job.Path + HdrSuffix)
307                                 }
308                                 if len(sendmail) > 0 && ctx.NotifyFile != nil {
309                                         cmd := exec.Command(
310                                                 sendmail[0],
311                                                 append(sendmail[1:], ctx.NotifyFile.To)...,
312                                         )
313                                         cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
314                                                 "File from %s: %s (%s)",
315                                                 ctx.Neigh[*job.PktEnc.Sender].Name,
316                                                 dst,
317                                                 humanize.IBytes(uint64(pktSize)),
318                                         ), nil)
319                                         if err = cmd.Run(); err != nil {
320                                                 ctx.LogE("rx", les, err, "notify")
321                                         }
322                                 }
323                         }
324                 case PktTypeFreq:
325                         if noFreq {
326                                 goto Closing
327                         }
328                         src := string(pkt.Path[:int(pkt.PathLen)])
329                         if filepath.IsAbs(src) {
330                                 ctx.LogE("rx", les, errors.New("non-relative source path"), "")
331                                 isBad = true
332                                 goto Closing
333                         }
334                         les := append(les, LEs{{"Type", "freq"}, {"Src", src}}...)
335                         dstRaw, err := ioutil.ReadAll(pipeR)
336                         if err != nil {
337                                 ctx.LogE("rx", les, err, "read")
338                                 isBad = true
339                                 goto Closing
340                         }
341                         dst := string(dstRaw)
342                         les = append(les, LE{"Dst", dst})
343                         sender := ctx.Neigh[*job.PktEnc.Sender]
344                         freqPath := sender.FreqPath
345                         if freqPath == nil {
346                                 ctx.LogE("rx", les, errors.New("freqing is not allowed"), "")
347                                 isBad = true
348                                 goto Closing
349                         }
350                         if !dryRun {
351                                 err = ctx.TxFile(
352                                         sender,
353                                         pkt.Nice,
354                                         filepath.Join(*freqPath, src),
355                                         dst,
356                                         sender.FreqChunked,
357                                         sender.FreqMinSize,
358                                         sender.FreqMaxSize,
359                                 )
360                                 if err != nil {
361                                         ctx.LogE("rx", les, err, "tx file")
362                                         isBad = true
363                                         goto Closing
364                                 }
365                         }
366                         ctx.LogI("rx", les, "")
367                         if !dryRun {
368                                 if doSeen {
369                                         if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
370                                                 fd.Close() // #nosec G104
371                                         }
372                                 }
373                                 if err = os.Remove(job.Path); err != nil {
374                                         ctx.LogE("rx", les, err, "remove")
375                                         isBad = true
376                                 } else if ctx.HdrUsage {
377                                         os.Remove(job.Path + HdrSuffix)
378                                 }
379                                 if len(sendmail) > 0 && ctx.NotifyFreq != nil {
380                                         cmd := exec.Command(
381                                                 sendmail[0],
382                                                 append(sendmail[1:], ctx.NotifyFreq.To)...,
383                                         )
384                                         cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
385                                                 "Freq from %s: %s", sender.Name, src,
386                                         ), nil)
387                                         if err = cmd.Run(); err != nil {
388                                                 ctx.LogE("rx", les, err, "notify")
389                                         }
390                                 }
391                         }
392                 case PktTypeTrns:
393                         if noTrns {
394                                 goto Closing
395                         }
396                         dst := new([blake2b.Size256]byte)
397                         copy(dst[:], pkt.Path[:int(pkt.PathLen)])
398                         nodeId := NodeId(*dst)
399                         node, known := ctx.Neigh[nodeId]
400                         les := append(les, LEs{{"Type", "trns"}, {"Dst", nodeId}}...)
401                         if !known {
402                                 ctx.LogE("rx", les, errors.New("unknown node"), "")
403                                 isBad = true
404                                 goto Closing
405                         }
406                         ctx.LogD("rx", les, "taken")
407                         if !dryRun {
408                                 if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
409                                         ctx.LogE("rx", les, err, "tx trns")
410                                         isBad = true
411                                         goto Closing
412                                 }
413                         }
414                         ctx.LogI("rx", les, "")
415                         if !dryRun {
416                                 if doSeen {
417                                         if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
418                                                 fd.Close() // #nosec G104
419                                         }
420                                 }
421                                 if err = os.Remove(job.Path); err != nil {
422                                         ctx.LogE("rx", les, err, "remove")
423                                         isBad = true
424                                 } else if ctx.HdrUsage {
425                                         os.Remove(job.Path + HdrSuffix)
426                                 }
427                         }
428                 default:
429                         ctx.LogE("rx", les, errors.New("unknown type"), "")
430                         isBad = true
431                 }
432         Closing:
433                 pipeR.Close() // #nosec G104
434         }
435         return isBad
436 }
437
438 func (ctx *Ctx) AutoToss(
439         nodeId *NodeId,
440         nice uint8,
441         doSeen, noFile, noFreq, noExec, noTrns bool,
442 ) (chan struct{}, chan bool) {
443         finish := make(chan struct{})
444         badCode := make(chan bool)
445         go func() {
446                 bad := false
447                 for {
448                         select {
449                         case <-finish:
450                                 badCode <- bad
451                                 break
452                         default:
453                         }
454                         time.Sleep(time.Second)
455                         bad = !ctx.Toss(nodeId, nice, false, doSeen, noFile, noFreq, noExec, noTrns)
456                 }
457         }()
458         return finish, badCode
459 }