]> Cypherpunks.ru repositories - nncp.git/blob - src/toss.go
Operations progress
[nncp.git] / src / toss.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2019 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bufio"
22         "bytes"
23         "encoding/base64"
24         "errors"
25         "fmt"
26         "io"
27         "io/ioutil"
28         "log"
29         "mime"
30         "os"
31         "os/exec"
32         "path"
33         "path/filepath"
34         "strconv"
35         "strings"
36
37         xdr "github.com/davecgh/go-xdr/xdr2"
38         "github.com/dustin/go-humanize"
39         "github.com/klauspost/compress/zstd"
40         "golang.org/x/crypto/blake2b"
41         "golang.org/x/crypto/poly1305"
42 )
43
44 const (
45         SeenSuffix = ".seen"
46 )
47
48 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
49         lines := []string{
50                 "From: " + fromTo.From,
51                 "To: " + fromTo.To,
52                 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
53         }
54         if len(body) > 0 {
55                 lines = append(lines, []string{
56                         "MIME-Version: 1.0",
57                         "Content-Type: text/plain; charset=utf-8",
58                         "Content-Transfer-Encoding: base64",
59                         "",
60                         base64.StdEncoding.EncodeToString(body),
61                 }...)
62         }
63         return strings.NewReader(strings.Join(lines, "\n"))
64 }
65
66 func (ctx *Ctx) Toss(
67         nodeId *NodeId,
68         nice uint8,
69         dryRun, doSeen, noFile, noFreq, noExec, noTrns bool,
70 ) bool {
71         isBad := false
72         sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
73         decompressor, err := zstd.NewReader(nil)
74         if err != nil {
75                 panic(err)
76         }
77         defer decompressor.Close()
78         for job := range ctx.Jobs(nodeId, TRx) {
79                 pktName := filepath.Base(job.Fd.Name())
80                 sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName}
81                 if job.PktEnc.Nice > nice {
82                         ctx.LogD("rx", SdsAdd(sds, SDS{"nice": int(job.PktEnc.Nice)}), "too nice")
83                         continue
84                 }
85                 pipeR, pipeW := io.Pipe()
86                 errs := make(chan error, 1)
87                 go func(job Job) {
88                         pipeWB := bufio.NewWriter(pipeW)
89                         _, _, err := PktEncRead(
90                                 ctx.Self,
91                                 ctx.Neigh,
92                                 bufio.NewReader(job.Fd),
93                                 pipeWB,
94                         )
95                         errs <- err
96                         pipeWB.Flush()
97                         pipeW.Close()
98                         job.Fd.Close()
99                         if err != nil {
100                                 ctx.LogE("rx", sds, err, "decryption")
101                         }
102                 }(job)
103                 var pkt Pkt
104                 var err error
105                 var pktSize int64
106                 var pktSizeBlocks int64
107                 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
108                         ctx.LogE("rx", sds, err, "unmarshal")
109                         isBad = true
110                         goto Closing
111                 }
112                 pktSize = job.Size - PktEncOverhead - PktOverhead - PktSizeOverhead
113                 pktSizeBlocks = pktSize / (EncBlkSize + poly1305.TagSize)
114                 if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
115                         pktSize -= poly1305.TagSize
116                 }
117                 pktSize -= pktSizeBlocks * poly1305.TagSize
118                 sds["size"] = pktSize
119                 ctx.LogD("rx", sds, "taken")
120                 switch pkt.Type {
121                 case PktTypeExec:
122                         if noExec {
123                                 goto Closing
124                         }
125                         path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
126                         handle := string(path[0])
127                         args := make([]string, 0, len(path)-1)
128                         for _, p := range path[1:] {
129                                 args = append(args, string(p))
130                         }
131                         argsStr := strings.Join(append([]string{handle}, args...), " ")
132                         sds := SdsAdd(sds, SDS{
133                                 "type": "exec",
134                                 "dst":  argsStr,
135                         })
136                         sender := ctx.Neigh[*job.PktEnc.Sender]
137                         cmdline, exists := sender.Exec[handle]
138                         if !exists || len(cmdline) == 0 {
139                                 ctx.LogE("rx", sds, errors.New("No handle found"), "")
140                                 isBad = true
141                                 goto Closing
142                         }
143                         if err = decompressor.Reset(pipeR); err != nil {
144                                 log.Fatalln(err)
145                         }
146                         if !dryRun {
147                                 cmd := exec.Command(
148                                         cmdline[0],
149                                         append(cmdline[1:len(cmdline)], args...)...,
150                                 )
151                                 cmd.Env = append(
152                                         cmd.Env,
153                                         "NNCP_SELF="+ctx.Self.Id.String(),
154                                         "NNCP_SENDER="+sender.Id.String(),
155                                         "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
156                                 )
157                                 cmd.Stdin = decompressor
158                                 output, err := cmd.Output()
159                                 if err != nil {
160                                         ctx.LogE("rx", sds, err, "handle")
161                                         isBad = true
162                                         goto Closing
163                                 }
164                                 if len(sendmail) > 0 && ctx.NotifyExec != nil {
165                                         notify, exists := ctx.NotifyExec[sender.Name+"."+handle]
166                                         if !exists {
167                                                 notify, exists = ctx.NotifyExec["*."+handle]
168                                         }
169                                         if exists {
170                                                 cmd := exec.Command(
171                                                         sendmail[0],
172                                                         append(sendmail[1:len(sendmail)], notify.To)...,
173                                                 )
174                                                 cmd.Stdin = newNotification(notify, fmt.Sprintf(
175                                                         "Exec from %s: %s", sender.Name, argsStr,
176                                                 ), output)
177                                                 cmd.Run()
178                                         }
179                                 }
180                         }
181                         ctx.LogI("rx", sds, "")
182                         if !dryRun {
183                                 if doSeen {
184                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
185                                                 fd.Close()
186                                         }
187                                 }
188                                 if err = os.Remove(job.Fd.Name()); err != nil {
189                                         ctx.LogE("rx", sds, err, "remove")
190                                         isBad = true
191                                 }
192                         }
193                 case PktTypeFile:
194                         if noFile {
195                                 goto Closing
196                         }
197                         dst := string(pkt.Path[:int(pkt.PathLen)])
198                         sds := SdsAdd(sds, SDS{"type": "file", "dst": dst})
199                         if filepath.IsAbs(dst) {
200                                 ctx.LogE("rx", sds, errors.New("non-relative destination path"), "")
201                                 isBad = true
202                                 goto Closing
203                         }
204                         incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
205                         if incoming == nil {
206                                 ctx.LogE("rx", sds, errors.New("incoming is not allowed"), "")
207                                 isBad = true
208                                 goto Closing
209                         }
210                         dir := filepath.Join(*incoming, path.Dir(dst))
211                         if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
212                                 ctx.LogE("rx", sds, err, "mkdir")
213                                 isBad = true
214                                 goto Closing
215                         }
216                         if !dryRun {
217                                 tmp, err := TempFile(dir, "file")
218                                 if err != nil {
219                                         ctx.LogE("rx", sds, err, "mktemp")
220                                         isBad = true
221                                         goto Closing
222                                 }
223                                 sds["tmp"] = tmp.Name()
224                                 ctx.LogD("rx", sds, "created")
225                                 bufW := bufio.NewWriter(tmp)
226                                 if _, err = CopyProgressed(
227                                         bufW,
228                                         pipeR,
229                                         SdsAdd(sds, SDS{"fullsize": sds["size"]}),
230                                         ctx.ShowPrgrs,
231                                 ); err != nil {
232                                         ctx.LogE("rx", sds, err, "copy")
233                                         isBad = true
234                                         goto Closing
235                                 }
236                                 if err = bufW.Flush(); err != nil {
237                                         tmp.Close()
238                                         ctx.LogE("rx", sds, err, "copy")
239                                         isBad = true
240                                         goto Closing
241                                 }
242                                 if err = tmp.Sync(); err != nil {
243                                         tmp.Close()
244                                         ctx.LogE("rx", sds, err, "copy")
245                                         isBad = true
246                                         goto Closing
247                                 }
248                                 tmp.Close()
249                                 dstPathOrig := filepath.Join(*incoming, dst)
250                                 dstPath := dstPathOrig
251                                 dstPathCtr := 0
252                                 for {
253                                         if _, err = os.Stat(dstPath); err != nil {
254                                                 if os.IsNotExist(err) {
255                                                         break
256                                                 }
257                                                 ctx.LogE("rx", sds, err, "stat")
258                                                 isBad = true
259                                                 goto Closing
260                                         }
261                                         dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
262                                         dstPathCtr++
263                                 }
264                                 if err = os.Rename(tmp.Name(), dstPath); err != nil {
265                                         ctx.LogE("rx", sds, err, "rename")
266                                         isBad = true
267                                 }
268                                 if err = DirSync(*incoming); err != nil {
269                                         ctx.LogE("rx", sds, err, "sync")
270                                         isBad = true
271                                 }
272                                 delete(sds, "tmp")
273                         }
274                         ctx.LogI("rx", sds, "")
275                         if !dryRun {
276                                 if doSeen {
277                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
278                                                 fd.Close()
279                                         }
280                                 }
281                                 if err = os.Remove(job.Fd.Name()); err != nil {
282                                         ctx.LogE("rx", sds, err, "remove")
283                                         isBad = true
284                                 }
285                                 if len(sendmail) > 0 && ctx.NotifyFile != nil {
286                                         cmd := exec.Command(
287                                                 sendmail[0],
288                                                 append(sendmail[1:len(sendmail)], ctx.NotifyFile.To)...,
289                                         )
290                                         cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
291                                                 "File from %s: %s (%s)",
292                                                 ctx.Neigh[*job.PktEnc.Sender].Name,
293                                                 dst,
294                                                 humanize.IBytes(uint64(pktSize)),
295                                         ), nil)
296                                         cmd.Run()
297                                 }
298                         }
299                 case PktTypeFreq:
300                         if noFreq {
301                                 goto Closing
302                         }
303                         src := string(pkt.Path[:int(pkt.PathLen)])
304                         if filepath.IsAbs(src) {
305                                 ctx.LogE("rx", sds, errors.New("non-relative source path"), "")
306                                 isBad = true
307                                 goto Closing
308                         }
309                         sds := SdsAdd(sds, SDS{"type": "freq", "src": src})
310                         dstRaw, err := ioutil.ReadAll(pipeR)
311                         if err != nil {
312                                 ctx.LogE("rx", sds, err, "read")
313                                 isBad = true
314                                 goto Closing
315                         }
316                         dst := string(dstRaw)
317                         sds["dst"] = dst
318                         sender := ctx.Neigh[*job.PktEnc.Sender]
319                         freqPath := sender.FreqPath
320                         if freqPath == nil {
321                                 ctx.LogE("rx", sds, errors.New("freqing is not allowed"), "")
322                                 isBad = true
323                                 goto Closing
324                         }
325                         if !dryRun {
326                                 err = ctx.TxFile(
327                                         sender,
328                                         pkt.Nice,
329                                         filepath.Join(*freqPath, src),
330                                         dst,
331                                         sender.FreqChunked,
332                                         sender.FreqMinSize,
333                                         sender.FreqMaxSize,
334                                 )
335                                 if err != nil {
336                                         ctx.LogE("rx", sds, err, "tx file")
337                                         isBad = true
338                                         goto Closing
339                                 }
340                         }
341                         ctx.LogI("rx", sds, "")
342                         if !dryRun {
343                                 if doSeen {
344                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
345                                                 fd.Close()
346                                         }
347                                 }
348                                 if err = os.Remove(job.Fd.Name()); err != nil {
349                                         ctx.LogE("rx", sds, err, "remove")
350                                         isBad = true
351                                 }
352                                 if len(sendmail) > 0 && ctx.NotifyFreq != nil {
353                                         cmd := exec.Command(
354                                                 sendmail[0],
355                                                 append(sendmail[1:len(sendmail)], ctx.NotifyFreq.To)...,
356                                         )
357                                         cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
358                                                 "Freq from %s: %s", sender.Name, src,
359                                         ), nil)
360                                         cmd.Run()
361                                 }
362                         }
363                 case PktTypeTrns:
364                         if noTrns {
365                                 goto Closing
366                         }
367                         dst := new([blake2b.Size256]byte)
368                         copy(dst[:], pkt.Path[:int(pkt.PathLen)])
369                         nodeId := NodeId(*dst)
370                         node, known := ctx.Neigh[nodeId]
371                         sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId})
372                         if !known {
373                                 ctx.LogE("rx", sds, errors.New("unknown node"), "")
374                                 isBad = true
375                                 goto Closing
376                         }
377                         ctx.LogD("rx", sds, "taken")
378                         if !dryRun {
379                                 if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
380                                         ctx.LogE("rx", sds, err, "tx trns")
381                                         isBad = true
382                                         goto Closing
383                                 }
384                         }
385                         ctx.LogI("rx", sds, "")
386                         if !dryRun {
387                                 if doSeen {
388                                         if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
389                                                 fd.Close()
390                                         }
391                                 }
392                                 if err = os.Remove(job.Fd.Name()); err != nil {
393                                         ctx.LogE("rx", sds, err, "remove")
394                                         isBad = true
395                                 }
396                         }
397                 default:
398                         ctx.LogE("rx", sds, errors.New("unknown type"), "")
399                         isBad = true
400                 }
401         Closing:
402                 pipeR.Close()
403         }
404         return isBad
405 }