]> Cypherpunks.ru repositories - nncp.git/blob - src/toss.go
recfile log format
[nncp.git] / src / toss.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bufio"
22         "bytes"
23         "encoding/base64"
24         "errors"
25         "fmt"
26         "io"
27         "io/ioutil"
28         "log"
29         "mime"
30         "os"
31         "os/exec"
32         "path"
33         "path/filepath"
34         "strconv"
35         "strings"
36         "time"
37
38         xdr "github.com/davecgh/go-xdr/xdr2"
39         "github.com/dustin/go-humanize"
40         "github.com/klauspost/compress/zstd"
41         "golang.org/x/crypto/blake2b"
42         "golang.org/x/crypto/poly1305"
43 )
44
45 const (
46         SeenSuffix = ".seen"
47 )
48
49 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
50         lines := []string{
51                 "From: " + fromTo.From,
52                 "To: " + fromTo.To,
53                 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
54         }
55         if len(body) > 0 {
56                 lines = append(lines, []string{
57                         "MIME-Version: 1.0",
58                         "Content-Type: text/plain; charset=utf-8",
59                         "Content-Transfer-Encoding: base64",
60                         "",
61                         base64.StdEncoding.EncodeToString(body),
62                 }...)
63         }
64         return strings.NewReader(strings.Join(lines, "\n"))
65 }
66
67 func (ctx *Ctx) Toss(
68         nodeId *NodeId,
69         nice uint8,
70         dryRun, doSeen, noFile, noFreq, noExec, noTrns bool,
71 ) bool {
72         dirLock, err := ctx.LockDir(nodeId, "toss")
73         if err != nil {
74                 ctx.LogE("rx", LEs{}, err, "lock")
75                 return false
76         }
77         defer ctx.UnlockDir(dirLock)
78         isBad := false
79         sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
80         decompressor, err := zstd.NewReader(nil)
81         if err != nil {
82                 panic(err)
83         }
84         defer decompressor.Close()
85         for job := range ctx.Jobs(nodeId, TRx) {
86                 pktName := filepath.Base(job.Fd.Name())
87                 les := LEs{{"Node", job.PktEnc.Sender}, {"Pkt", pktName}}
88                 if job.PktEnc.Nice > nice {
89                         ctx.LogD("rx", append(les, LE{"Nice", int(job.PktEnc.Nice)}), "too nice")
90                         job.Fd.Close() // #nosec G104
91                         continue
92                 }
93                 pipeR, pipeW := io.Pipe()
94                 go func(job Job) error {
95                         pipeWB := bufio.NewWriter(pipeW)
96                         _, _, err := PktEncRead(
97                                 ctx.Self,
98                                 ctx.Neigh,
99                                 bufio.NewReader(job.Fd),
100                                 pipeWB,
101                         )
102                         job.Fd.Close() // #nosec G104
103                         if err != nil {
104                                 return pipeW.CloseWithError(err)
105                         }
106                         if err = pipeWB.Flush(); err != nil {
107                                 return pipeW.CloseWithError(err)
108                         }
109                         return pipeW.Close()
110                 }(job)
111                 var pkt Pkt
112                 var err error
113                 var pktSize int64
114                 var pktSizeBlocks int64
115                 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
116                         ctx.LogE("rx", les, err, "unmarshal")
117                         isBad = true
118                         goto Closing
119                 }
120                 pktSize = job.Size - PktEncOverhead - PktOverhead - PktSizeOverhead
121                 pktSizeBlocks = pktSize / (EncBlkSize + poly1305.TagSize)
122                 if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
123                         pktSize -= poly1305.TagSize
124                 }
125                 pktSize -= pktSizeBlocks * poly1305.TagSize
126                 les = append(les, LE{"Size", pktSize})
127                 ctx.LogD("rx", les, "taken")
128                 switch pkt.Type {
129                 case PktTypeExec, PktTypeExecFat:
130                         if noExec {
131                                 goto Closing
132                         }
133                         path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
134                         handle := string(path[0])
135                         args := make([]string, 0, len(path)-1)
136                         for _, p := range path[1:] {
137                                 args = append(args, string(p))
138                         }
139                         argsStr := strings.Join(append([]string{handle}, args...), " ")
140                         les = append(les, LEs{
141                                 {"Type", "exec"},
142                                 {"Dst", argsStr},
143                         }...)
144                         sender := ctx.Neigh[*job.PktEnc.Sender]
145                         cmdline, exists := sender.Exec[handle]
146                         if !exists || len(cmdline) == 0 {
147                                 ctx.LogE("rx", les, errors.New("No handle found"), "")
148                                 isBad = true
149                                 goto Closing
150                         }
151                         if pkt.Type == PktTypeExec {
152                                 if err = decompressor.Reset(pipeR); err != nil {
153                                         log.Fatalln(err)
154                                 }
155                         }
156                         if !dryRun {
157                                 cmd := exec.Command(
158                                         cmdline[0],
159                                         append(cmdline[1:], args...)...,
160                                 )
161                                 cmd.Env = append(
162                                         cmd.Env,
163                                         "NNCP_SELF="+ctx.Self.Id.String(),
164                                         "NNCP_SENDER="+sender.Id.String(),
165                                         "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
166                                 )
167                                 if pkt.Type == PktTypeExec {
168                                         cmd.Stdin = decompressor
169                                 } else {
170                                         cmd.Stdin = pipeR
171                                 }
172                                 output, err := cmd.Output()
173                                 if err != nil {
174                                         ctx.LogE("rx", les, err, "handle")
175                                         isBad = true
176                                         goto Closing
177                                 }
178                                 if len(sendmail) > 0 && ctx.NotifyExec != nil {
179                                         notify, exists := ctx.NotifyExec[sender.Name+"."+handle]
180                                         if !exists {
181                                                 notify, exists = ctx.NotifyExec["*."+handle]
182                                         }
183                                         if exists {
184                                                 cmd := exec.Command(
185                                                         sendmail[0],
186                                                         append(sendmail[1:], notify.To)...,
187                                                 )
188                                                 cmd.Stdin = newNotification(notify, fmt.Sprintf(
189                                                         "Exec from %s: %s", sender.Name, argsStr,
190                                                 ), output)
191                                                 if err = cmd.Run(); err != nil {
192                                                         ctx.LogE("rx", les, err, "notify")
193                                                 }
194                                         }
195                                 }
196                         }
197                         ctx.LogI("rx", les, "")
198                         if !dryRun {
199                                 if doSeen {
200                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
201                                                 fd.Close() // #nosec G104
202                                         }
203                                 }
204                                 if err = os.Remove(job.Fd.Name()); err != nil {
205                                         ctx.LogE("rx", les, err, "remove")
206                                         isBad = true
207                                 }
208                         }
209                 case PktTypeFile:
210                         if noFile {
211                                 goto Closing
212                         }
213                         dst := string(pkt.Path[:int(pkt.PathLen)])
214                         les = append(les, LEs{{"Type", "file"}, {"Dst", dst}}...)
215                         if filepath.IsAbs(dst) {
216                                 ctx.LogE("rx", les, errors.New("non-relative destination path"), "")
217                                 isBad = true
218                                 goto Closing
219                         }
220                         incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
221                         if incoming == nil {
222                                 ctx.LogE("rx", les, errors.New("incoming is not allowed"), "")
223                                 isBad = true
224                                 goto Closing
225                         }
226                         dir := filepath.Join(*incoming, path.Dir(dst))
227                         if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
228                                 ctx.LogE("rx", les, err, "mkdir")
229                                 isBad = true
230                                 goto Closing
231                         }
232                         if !dryRun {
233                                 tmp, err := TempFile(dir, "file")
234                                 if err != nil {
235                                         ctx.LogE("rx", les, err, "mktemp")
236                                         isBad = true
237                                         goto Closing
238                                 }
239                                 les = append(les, LE{"Tmp", tmp.Name()})
240                                 ctx.LogD("rx", les, "created")
241                                 bufW := bufio.NewWriter(tmp)
242                                 if _, err = CopyProgressed(
243                                         bufW, pipeR, "Rx file",
244                                         append(les, LE{"FullSize", pktSize}),
245                                         ctx.ShowPrgrs,
246                                 ); err != nil {
247                                         ctx.LogE("rx", les, err, "copy")
248                                         isBad = true
249                                         goto Closing
250                                 }
251                                 if err = bufW.Flush(); err != nil {
252                                         tmp.Close() // #nosec G104
253                                         ctx.LogE("rx", les, err, "copy")
254                                         isBad = true
255                                         goto Closing
256                                 }
257                                 if err = tmp.Sync(); err != nil {
258                                         tmp.Close() // #nosec G104
259                                         ctx.LogE("rx", les, err, "copy")
260                                         isBad = true
261                                         goto Closing
262                                 }
263                                 if err = tmp.Close(); err != nil {
264                                         ctx.LogE("rx", les, err, "copy")
265                                         isBad = true
266                                         goto Closing
267                                 }
268                                 dstPathOrig := filepath.Join(*incoming, dst)
269                                 dstPath := dstPathOrig
270                                 dstPathCtr := 0
271                                 for {
272                                         if _, err = os.Stat(dstPath); err != nil {
273                                                 if os.IsNotExist(err) {
274                                                         break
275                                                 }
276                                                 ctx.LogE("rx", les, err, "stat")
277                                                 isBad = true
278                                                 goto Closing
279                                         }
280                                         dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
281                                         dstPathCtr++
282                                 }
283                                 if err = os.Rename(tmp.Name(), dstPath); err != nil {
284                                         ctx.LogE("rx", les, err, "rename")
285                                         isBad = true
286                                 }
287                                 if err = DirSync(*incoming); err != nil {
288                                         ctx.LogE("rx", les, err, "sync")
289                                         isBad = true
290                                 }
291                                 les = les[:len(les)-1] // delete Tmp
292                         }
293                         ctx.LogI("rx", les, "")
294                         if !dryRun {
295                                 if doSeen {
296                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
297                                                 fd.Close() // #nosec G104
298                                         }
299                                 }
300                                 if err = os.Remove(job.Fd.Name()); err != nil {
301                                         ctx.LogE("rx", les, err, "remove")
302                                         isBad = true
303                                 }
304                                 if len(sendmail) > 0 && ctx.NotifyFile != nil {
305                                         cmd := exec.Command(
306                                                 sendmail[0],
307                                                 append(sendmail[1:], ctx.NotifyFile.To)...,
308                                         )
309                                         cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
310                                                 "File from %s: %s (%s)",
311                                                 ctx.Neigh[*job.PktEnc.Sender].Name,
312                                                 dst,
313                                                 humanize.IBytes(uint64(pktSize)),
314                                         ), nil)
315                                         if err = cmd.Run(); err != nil {
316                                                 ctx.LogE("rx", les, err, "notify")
317                                         }
318                                 }
319                         }
320                 case PktTypeFreq:
321                         if noFreq {
322                                 goto Closing
323                         }
324                         src := string(pkt.Path[:int(pkt.PathLen)])
325                         if filepath.IsAbs(src) {
326                                 ctx.LogE("rx", les, errors.New("non-relative source path"), "")
327                                 isBad = true
328                                 goto Closing
329                         }
330                         les := append(les, LEs{{"Type", "freq"}, {"Src", src}}...)
331                         dstRaw, err := ioutil.ReadAll(pipeR)
332                         if err != nil {
333                                 ctx.LogE("rx", les, err, "read")
334                                 isBad = true
335                                 goto Closing
336                         }
337                         dst := string(dstRaw)
338                         les = append(les, LE{"Dst", dst})
339                         sender := ctx.Neigh[*job.PktEnc.Sender]
340                         freqPath := sender.FreqPath
341                         if freqPath == nil {
342                                 ctx.LogE("rx", les, errors.New("freqing is not allowed"), "")
343                                 isBad = true
344                                 goto Closing
345                         }
346                         if !dryRun {
347                                 err = ctx.TxFile(
348                                         sender,
349                                         pkt.Nice,
350                                         filepath.Join(*freqPath, src),
351                                         dst,
352                                         sender.FreqChunked,
353                                         sender.FreqMinSize,
354                                         sender.FreqMaxSize,
355                                 )
356                                 if err != nil {
357                                         ctx.LogE("rx", les, err, "tx file")
358                                         isBad = true
359                                         goto Closing
360                                 }
361                         }
362                         ctx.LogI("rx", les, "")
363                         if !dryRun {
364                                 if doSeen {
365                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
366                                                 fd.Close() // #nosec G104
367                                         }
368                                 }
369                                 if err = os.Remove(job.Fd.Name()); err != nil {
370                                         ctx.LogE("rx", les, err, "remove")
371                                         isBad = true
372                                 }
373                                 if len(sendmail) > 0 && ctx.NotifyFreq != nil {
374                                         cmd := exec.Command(
375                                                 sendmail[0],
376                                                 append(sendmail[1:], ctx.NotifyFreq.To)...,
377                                         )
378                                         cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
379                                                 "Freq from %s: %s", sender.Name, src,
380                                         ), nil)
381                                         if err = cmd.Run(); err != nil {
382                                                 ctx.LogE("rx", les, err, "notify")
383                                         }
384                                 }
385                         }
386                 case PktTypeTrns:
387                         if noTrns {
388                                 goto Closing
389                         }
390                         dst := new([blake2b.Size256]byte)
391                         copy(dst[:], pkt.Path[:int(pkt.PathLen)])
392                         nodeId := NodeId(*dst)
393                         node, known := ctx.Neigh[nodeId]
394                         les := append(les, LEs{{"Type", "trns"}, {"Dst", nodeId}}...)
395                         if !known {
396                                 ctx.LogE("rx", les, errors.New("unknown node"), "")
397                                 isBad = true
398                                 goto Closing
399                         }
400                         ctx.LogD("rx", les, "taken")
401                         if !dryRun {
402                                 if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
403                                         ctx.LogE("rx", les, err, "tx trns")
404                                         isBad = true
405                                         goto Closing
406                                 }
407                         }
408                         ctx.LogI("rx", les, "")
409                         if !dryRun {
410                                 if doSeen {
411                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
412                                                 fd.Close() // #nosec G104
413                                         }
414                                 }
415                                 if err = os.Remove(job.Fd.Name()); err != nil {
416                                         ctx.LogE("rx", les, err, "remove")
417                                         isBad = true
418                                 }
419                         }
420                 default:
421                         ctx.LogE("rx", les, errors.New("unknown type"), "")
422                         isBad = true
423                 }
424         Closing:
425                 pipeR.Close() // #nosec G104
426         }
427         return isBad
428 }
429
430 func (ctx *Ctx) AutoToss(
431         nodeId *NodeId,
432         nice uint8,
433         doSeen, noFile, noFreq, noExec, noTrns bool,
434 ) (chan struct{}, chan bool) {
435         finish := make(chan struct{})
436         badCode := make(chan bool)
437         go func() {
438                 bad := false
439                 for {
440                         select {
441                         case <-finish:
442                                 badCode <- bad
443                                 break
444                         default:
445                         }
446                         time.Sleep(time.Second)
447                         bad = !ctx.Toss(nodeId, nice, false, doSeen, noFile, noFreq, noExec, noTrns)
448                 }
449         }()
450         return finish, badCode
451 }