]> Cypherpunks.ru repositories - nncp.git/blob - src/toss.go
Remove hdr/ files during ACK tossing
[nncp.git] / src / toss.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2022 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bufio"
22         "bytes"
23         "encoding/base64"
24         "errors"
25         "fmt"
26         "io"
27         "io/ioutil"
28         "log"
29         "mime"
30         "os"
31         "os/exec"
32         "path"
33         "path/filepath"
34         "strconv"
35         "strings"
36         "time"
37
38         xdr "github.com/davecgh/go-xdr/xdr2"
39         "github.com/dustin/go-humanize"
40         "github.com/klauspost/compress/zstd"
41         "golang.org/x/crypto/blake2b"
42         "golang.org/x/crypto/poly1305"
43 )
44
45 const (
46         SeenDir = "seen"
47 )
48
49 func jobPath2Seen(jobPath string) string {
50         return filepath.Join(filepath.Dir(jobPath), SeenDir, filepath.Base(jobPath))
51 }
52
53 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
54         lines := []string{
55                 "From: " + fromTo.From,
56                 "To: " + fromTo.To,
57                 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
58         }
59         if len(body) > 0 {
60                 lines = append(
61                         lines,
62                         "MIME-Version: 1.0",
63                         "Content-Type: text/plain; charset=utf-8",
64                         "Content-Transfer-Encoding: base64",
65                         "",
66                         base64.StdEncoding.EncodeToString(body),
67                 )
68         }
69         return strings.NewReader(strings.Join(lines, "\n"))
70 }
71
72 func pktSizeWithoutEnc(pktSize int64) int64 {
73         pktSize = pktSize - PktEncOverhead - PktOverhead - PktSizeOverhead
74         pktSizeBlocks := pktSize / (EncBlkSize + poly1305.TagSize)
75         if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
76                 pktSize -= poly1305.TagSize
77         }
78         pktSize -= pktSizeBlocks * poly1305.TagSize
79         return pktSize
80 }
81
82 var JobRepeatProcess = errors.New("needs processing repeat")
83
84 func jobProcess(
85         ctx *Ctx,
86         pipeR *io.PipeReader,
87         pktName string,
88         les LEs,
89         sender *Node,
90         nice uint8,
91         pktSize uint64,
92         jobPath string,
93         decompressor *zstd.Decoder,
94         dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool,
95 ) error {
96         defer pipeR.Close()
97         sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
98         var pkt Pkt
99         _, err := xdr.Unmarshal(pipeR, &pkt)
100         if err != nil {
101                 ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
102                         return fmt.Sprintf("Tossing %s/%s: unmarshal", sender.Name, pktName)
103                 })
104                 return err
105         }
106         les = append(les, LE{"Size", int64(pktSize)})
107         ctx.LogD("rx", les, func(les LEs) string {
108                 return fmt.Sprintf(
109                         "Tossing %s/%s (%s)",
110                         sender.Name, pktName,
111                         humanize.IBytes(pktSize),
112                 )
113         })
114         switch pkt.Type {
115         case PktTypeExec, PktTypeExecFat:
116                 if noExec {
117                         return nil
118                 }
119                 path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
120                 handle := string(path[0])
121                 args := make([]string, 0, len(path)-1)
122                 for _, p := range path[1:] {
123                         args = append(args, string(p))
124                 }
125                 argsStr := strings.Join(append([]string{handle}, args...), " ")
126                 les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr})
127                 cmdline := sender.Exec[handle]
128                 if len(cmdline) == 0 {
129                         err = errors.New("No handle found")
130                         ctx.LogE(
131                                 "rx-no-handle", les, err,
132                                 func(les LEs) string {
133                                         return fmt.Sprintf(
134                                                 "Tossing exec %s/%s (%s): %s",
135                                                 sender.Name, pktName,
136                                                 humanize.IBytes(pktSize), argsStr,
137                                         )
138                                 },
139                         )
140                         return err
141                 }
142                 if pkt.Type == PktTypeExec {
143                         if err = decompressor.Reset(pipeR); err != nil {
144                                 log.Fatalln(err)
145                         }
146                 }
147                 if !dryRun {
148                         cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...)
149                         cmd.Env = append(
150                                 cmd.Env,
151                                 "NNCP_SELF="+ctx.Self.Id.String(),
152                                 "NNCP_SENDER="+sender.Id.String(),
153                                 "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
154                         )
155                         if pkt.Type == PktTypeExec {
156                                 cmd.Stdin = decompressor
157                         } else {
158                                 cmd.Stdin = pipeR
159                         }
160                         output, err := cmd.CombinedOutput()
161                         if err != nil {
162                                 les = append(les, LE{"Output", strings.Split(
163                                         strings.Trim(string(output), "\n"), "\n"),
164                                 })
165                                 ctx.LogE("rx-handle", les, err, func(les LEs) string {
166                                         return fmt.Sprintf(
167                                                 "Tossing exec %s/%s (%s): %s: handling",
168                                                 sender.Name, pktName,
169                                                 humanize.IBytes(uint64(pktSize)), argsStr,
170                                         )
171                                 })
172                                 return err
173                         }
174                         if len(sendmail) > 0 && ctx.NotifyExec != nil {
175                                 notify := ctx.NotifyExec[sender.Name+"."+handle]
176                                 if notify == nil {
177                                         notify = ctx.NotifyExec["*."+handle]
178                                 }
179                                 if notify != nil {
180                                         cmd := exec.Command(
181                                                 sendmail[0],
182                                                 append(sendmail[1:], notify.To)...,
183                                         )
184                                         cmd.Stdin = newNotification(notify, fmt.Sprintf(
185                                                 "Exec from %s: %s", sender.Name, argsStr,
186                                         ), output)
187                                         if err = cmd.Run(); err != nil {
188                                                 ctx.LogE("rx-notify", les, err, func(les LEs) string {
189                                                         return fmt.Sprintf(
190                                                                 "Tossing exec %s/%s (%s): %s: notifying",
191                                                                 sender.Name, pktName,
192                                                                 humanize.IBytes(pktSize), argsStr,
193                                                         )
194                                                 })
195                                         }
196                                 }
197                         }
198                 }
199                 ctx.LogI("rx", les, func(les LEs) string {
200                         return fmt.Sprintf(
201                                 "Got exec from %s to %s (%s)",
202                                 sender.Name, argsStr,
203                                 humanize.IBytes(pktSize),
204                         )
205                 })
206                 if !dryRun && jobPath != "" {
207                         if doSeen {
208                                 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
209                                         return err
210                                 }
211                                 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
212                                         fd.Close()
213                                         if err = DirSync(filepath.Dir(jobPath)); err != nil {
214                                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
215                                                         return fmt.Sprintf(
216                                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
217                                                                 sender.Name, pktName,
218                                                                 humanize.IBytes(pktSize),
219                                                                 filepath.Base(jobPath),
220                                                         )
221                                                 })
222                                                 return err
223                                         }
224                                 }
225                         }
226                         if err = os.Remove(jobPath); err != nil {
227                                 ctx.LogE("rx-notify", les, err, func(les LEs) string {
228                                         return fmt.Sprintf(
229                                                 "Tossing exec %s/%s (%s): %s: notifying",
230                                                 sender.Name, pktName,
231                                                 humanize.IBytes(pktSize), argsStr,
232                                         )
233                                 })
234                                 return err
235                         } else if ctx.HdrUsage {
236                                 os.Remove(JobPath2Hdr(jobPath))
237                         }
238                 }
239
240         case PktTypeFile:
241                 if noFile {
242                         return nil
243                 }
244                 dst := string(pkt.Path[:int(pkt.PathLen)])
245                 les = append(les, LE{"Type", "file"}, LE{"Dst", dst})
246                 if filepath.IsAbs(dst) {
247                         err = errors.New("non-relative destination path")
248                         ctx.LogE(
249                                 "rx-non-rel", les, err,
250                                 func(les LEs) string {
251                                         return fmt.Sprintf(
252                                                 "Tossing file %s/%s (%s): %s",
253                                                 sender.Name, pktName,
254                                                 humanize.IBytes(pktSize), dst,
255                                         )
256                                 },
257                         )
258                         return err
259                 }
260                 incoming := sender.Incoming
261                 if incoming == nil {
262                         err = errors.New("incoming is not allowed")
263                         ctx.LogE(
264                                 "rx-no-incoming", les, err,
265                                 func(les LEs) string {
266                                         return fmt.Sprintf(
267                                                 "Tossing file %s/%s (%s): %s",
268                                                 sender.Name, pktName,
269                                                 humanize.IBytes(pktSize), dst,
270                                         )
271                                 },
272                         )
273                         return err
274                 }
275                 dir := filepath.Join(*incoming, path.Dir(dst))
276                 if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
277                         ctx.LogE("rx-mkdir", les, err, func(les LEs) string {
278                                 return fmt.Sprintf(
279                                         "Tossing file %s/%s (%s): %s: mkdir",
280                                         sender.Name, pktName,
281                                         humanize.IBytes(pktSize), dst,
282                                 )
283                         })
284                         return err
285                 }
286                 if !dryRun {
287                         tmp, err := TempFile(dir, "file")
288                         if err != nil {
289                                 ctx.LogE("rx-mktemp", les, err, func(les LEs) string {
290                                         return fmt.Sprintf(
291                                                 "Tossing file %s/%s (%s): %s: mktemp",
292                                                 sender.Name, pktName,
293                                                 humanize.IBytes(pktSize), dst,
294                                         )
295                                 })
296                                 return err
297                         }
298                         les = append(les, LE{"Tmp", tmp.Name()})
299                         ctx.LogD("rx-tmp-created", les, func(les LEs) string {
300                                 return fmt.Sprintf(
301                                         "Tossing file %s/%s (%s): %s: created: %s",
302                                         sender.Name, pktName,
303                                         humanize.IBytes(pktSize), dst, tmp.Name(),
304                                 )
305                         })
306                         bufW := bufio.NewWriter(tmp)
307                         if _, err = CopyProgressed(
308                                 bufW, pipeR, "Rx file",
309                                 append(les, LE{"FullSize", int64(pktSize)}),
310                                 ctx.ShowPrgrs,
311                         ); err != nil {
312                                 ctx.LogE("rx-copy", les, err, func(les LEs) string {
313                                         return fmt.Sprintf(
314                                                 "Tossing file %s/%s (%s): %s: copying",
315                                                 sender.Name, pktName,
316                                                 humanize.IBytes(pktSize), dst,
317                                         )
318                                 })
319                                 return err
320                         }
321                         if err = bufW.Flush(); err != nil {
322                                 tmp.Close()
323                                 ctx.LogE("rx-flush", les, err, func(les LEs) string {
324                                         return fmt.Sprintf(
325                                                 "Tossing file %s/%s (%s): %s: flushing",
326                                                 sender.Name, pktName,
327                                                 humanize.IBytes(pktSize), dst,
328                                         )
329                                 })
330                                 return err
331                         }
332                         if !NoSync {
333                                 if err = tmp.Sync(); err != nil {
334                                         tmp.Close()
335                                         ctx.LogE("rx-sync", les, err, func(les LEs) string {
336                                                 return fmt.Sprintf(
337                                                         "Tossing file %s/%s (%s): %s: syncing",
338                                                         sender.Name, pktName,
339                                                         humanize.IBytes(pktSize), dst,
340                                                 )
341                                         })
342                                         return err
343                                 }
344                         }
345                         if err = tmp.Close(); err != nil {
346                                 ctx.LogE("rx-close", les, err, func(les LEs) string {
347                                         return fmt.Sprintf(
348                                                 "Tossing file %s/%s (%s): %s: closing",
349                                                 sender.Name, pktName,
350                                                 humanize.IBytes(pktSize), dst,
351                                         )
352                                 })
353                                 return err
354                         }
355                         dstPathOrig := filepath.Join(*incoming, dst)
356                         dstPath := dstPathOrig
357                         dstPathCtr := 0
358                         for {
359                                 if _, err = os.Stat(dstPath); err != nil {
360                                         if os.IsNotExist(err) {
361                                                 break
362                                         }
363                                         ctx.LogE("rx-stat", les, err, func(les LEs) string {
364                                                 return fmt.Sprintf(
365                                                         "Tossing file %s/%s (%s): %s: stating: %s",
366                                                         sender.Name, pktName,
367                                                         humanize.IBytes(pktSize), dst, dstPath,
368                                                 )
369                                         })
370                                         return err
371                                 }
372                                 dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
373                                 dstPathCtr++
374                         }
375                         if err = os.Rename(tmp.Name(), dstPath); err != nil {
376                                 ctx.LogE("rx-rename", les, err, func(les LEs) string {
377                                         return fmt.Sprintf(
378                                                 "Tossing file %s/%s (%s): %s: renaming",
379                                                 sender.Name, pktName,
380                                                 humanize.IBytes(pktSize), dst,
381                                         )
382                                 })
383                                 return err
384                         }
385                         if err = DirSync(*incoming); err != nil {
386                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
387                                         return fmt.Sprintf(
388                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
389                                                 sender.Name, pktName,
390                                                 humanize.IBytes(pktSize), dst,
391                                         )
392                                 })
393                                 return err
394                         }
395                         les = les[:len(les)-1] // delete Tmp
396                 }
397                 ctx.LogI("rx", les, func(les LEs) string {
398                         return fmt.Sprintf(
399                                 "Got file %s (%s) from %s",
400                                 dst, humanize.IBytes(pktSize), sender.Name,
401                         )
402                 })
403                 if !dryRun {
404                         if jobPath != "" {
405                                 if doSeen {
406                                         if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
407                                                 return err
408                                         }
409                                         if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
410                                                 fd.Close()
411                                                 if err = DirSync(filepath.Dir(jobPath)); err != nil {
412                                                         ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
413                                                                 return fmt.Sprintf(
414                                                                         "Tossing file %s/%s (%s): %s: dirsyncing",
415                                                                         sender.Name, pktName,
416                                                                         humanize.IBytes(pktSize),
417                                                                         filepath.Base(jobPath),
418                                                                 )
419                                                         })
420                                                         return err
421                                                 }
422                                         }
423                                 }
424                                 if err = os.Remove(jobPath); err != nil {
425                                         ctx.LogE("rx-remove", les, err, func(les LEs) string {
426                                                 return fmt.Sprintf(
427                                                         "Tossing file %s/%s (%s): %s: removing",
428                                                         sender.Name, pktName,
429                                                         humanize.IBytes(pktSize), dst,
430                                                 )
431                                         })
432                                         return err
433                                 } else if ctx.HdrUsage {
434                                         os.Remove(JobPath2Hdr(jobPath))
435                                 }
436                         }
437                         if len(sendmail) > 0 && ctx.NotifyFile != nil {
438                                 cmd := exec.Command(
439                                         sendmail[0],
440                                         append(sendmail[1:], ctx.NotifyFile.To)...,
441                                 )
442                                 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
443                                         "File from %s: %s (%s)",
444                                         sender.Name, dst, humanize.IBytes(pktSize),
445                                 ), nil)
446                                 if err = cmd.Run(); err != nil {
447                                         ctx.LogE("rx-notify", les, err, func(les LEs) string {
448                                                 return fmt.Sprintf(
449                                                         "Tossing file %s/%s (%s): %s: notifying",
450                                                         sender.Name, pktName,
451                                                         humanize.IBytes(pktSize), dst,
452                                                 )
453                                         })
454                                 }
455                         }
456                 }
457
458         case PktTypeFreq:
459                 if noFreq {
460                         return nil
461                 }
462                 src := string(pkt.Path[:int(pkt.PathLen)])
463                 les := append(les, LE{"Type", "freq"}, LE{"Src", src})
464                 if filepath.IsAbs(src) {
465                         err = errors.New("non-relative source path")
466                         ctx.LogE(
467                                 "rx-non-rel", les, err,
468                                 func(les LEs) string {
469                                         return fmt.Sprintf(
470                                                 "Tossing freq %s/%s (%s): %s: notifying",
471                                                 sender.Name, pktName,
472                                                 humanize.IBytes(pktSize), src,
473                                         )
474                                 },
475                         )
476                         return err
477                 }
478                 dstRaw, err := ioutil.ReadAll(pipeR)
479                 if err != nil {
480                         ctx.LogE("rx-read", les, err, func(les LEs) string {
481                                 return fmt.Sprintf(
482                                         "Tossing freq %s/%s (%s): %s: reading",
483                                         sender.Name, pktName,
484                                         humanize.IBytes(pktSize), src,
485                                 )
486                         })
487                         return err
488                 }
489                 dst := string(dstRaw)
490                 les = append(les, LE{"Dst", dst})
491                 freqPath := sender.FreqPath
492                 if freqPath == nil {
493                         err = errors.New("freqing is not allowed")
494                         ctx.LogE(
495                                 "rx-no-freq", les, err,
496                                 func(les LEs) string {
497                                         return fmt.Sprintf(
498                                                 "Tossing freq %s/%s (%s): %s -> %s",
499                                                 sender.Name, pktName,
500                                                 humanize.IBytes(pktSize), src, dst,
501                                         )
502                                 },
503                         )
504                         return err
505                 }
506                 if !dryRun {
507                         err = ctx.TxFile(
508                                 sender,
509                                 pkt.Nice,
510                                 filepath.Join(*freqPath, src),
511                                 dst,
512                                 sender.FreqChunked,
513                                 sender.FreqMinSize,
514                                 sender.FreqMaxSize,
515                                 nil,
516                         )
517                         if err != nil {
518                                 ctx.LogE("rx-tx", les, err, func(les LEs) string {
519                                         return fmt.Sprintf(
520                                                 "Tossing freq %s/%s (%s): %s -> %s: txing",
521                                                 sender.Name, pktName,
522                                                 humanize.IBytes(pktSize), src, dst,
523                                         )
524                                 })
525                                 return err
526                         }
527                 }
528                 ctx.LogI("rx", les, func(les LEs) string {
529                         return fmt.Sprintf("Got file request %s to %s", src, sender.Name)
530                 })
531                 if !dryRun {
532                         if jobPath != "" {
533                                 if doSeen {
534                                         if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
535                                                 return err
536                                         }
537                                         if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
538                                                 fd.Close()
539                                                 if err = DirSync(filepath.Dir(jobPath)); err != nil {
540                                                         ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
541                                                                 return fmt.Sprintf(
542                                                                         "Tossing file %s/%s (%s): %s: dirsyncing",
543                                                                         sender.Name, pktName,
544                                                                         humanize.IBytes(pktSize),
545                                                                         filepath.Base(jobPath),
546                                                                 )
547                                                         })
548                                                         return err
549                                                 }
550                                         }
551                                 }
552                                 if err = os.Remove(jobPath); err != nil {
553                                         ctx.LogE("rx-remove", les, err, func(les LEs) string {
554                                                 return fmt.Sprintf(
555                                                         "Tossing freq %s/%s (%s): %s -> %s: removing",
556                                                         sender.Name, pktName,
557                                                         humanize.IBytes(pktSize), src, dst,
558                                                 )
559                                         })
560                                         return err
561                                 } else if ctx.HdrUsage {
562                                         os.Remove(JobPath2Hdr(jobPath))
563                                 }
564                         }
565                         if len(sendmail) > 0 && ctx.NotifyFreq != nil {
566                                 cmd := exec.Command(
567                                         sendmail[0],
568                                         append(sendmail[1:], ctx.NotifyFreq.To)...,
569                                 )
570                                 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
571                                         "Freq from %s: %s", sender.Name, src,
572                                 ), nil)
573                                 if err = cmd.Run(); err != nil {
574                                         ctx.LogE("rx-notify", les, err, func(les LEs) string {
575                                                 return fmt.Sprintf(
576                                                         "Tossing freq %s/%s (%s): %s -> %s: notifying",
577                                                         sender.Name, pktName,
578                                                         humanize.IBytes(pktSize), src, dst,
579                                                 )
580                                         })
581                                 }
582                         }
583                 }
584
585         case PktTypeTrns:
586                 if noTrns {
587                         return nil
588                 }
589                 dst := new([MTHSize]byte)
590                 copy(dst[:], pkt.Path[:int(pkt.PathLen)])
591                 nodeId := NodeId(*dst)
592                 les := append(les, LE{"Type", "trns"}, LE{"Dst", nodeId})
593                 logMsg := func(les LEs) string {
594                         return fmt.Sprintf(
595                                 "Tossing trns %s/%s (%s): %s",
596                                 sender.Name, pktName,
597                                 humanize.IBytes(pktSize),
598                                 nodeId.String(),
599                         )
600                 }
601                 node := ctx.Neigh[nodeId]
602                 if node == nil {
603                         err = errors.New("unknown node")
604                         ctx.LogE("rx-unknown", les, err, logMsg)
605                         return err
606                 }
607                 ctx.LogD("rx-tx", les, logMsg)
608                 if !dryRun {
609                         if len(node.Via) == 0 {
610                                 if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil {
611                                         ctx.LogE("rx", les, err, func(les LEs) string {
612                                                 return logMsg(les) + ": txing"
613                                         })
614                                         return err
615                                 }
616                         } else {
617                                 via := node.Via[:len(node.Via)-1]
618                                 node = ctx.Neigh[*node.Via[len(node.Via)-1]]
619                                 node = &Node{Id: node.Id, Via: via, ExchPub: node.ExchPub}
620                                 pktTrns, err := NewPkt(PktTypeTrns, 0, nodeId[:])
621                                 if err != nil {
622                                         panic(err)
623                                 }
624                                 if _, _, err = ctx.Tx(
625                                         node,
626                                         pktTrns,
627                                         nice,
628                                         int64(pktSize), 0, MaxFileSize,
629                                         pipeR,
630                                         pktName,
631                                         nil,
632                                 ); err != nil {
633                                         ctx.LogE("rx", les, err, func(les LEs) string {
634                                                 return logMsg(les) + ": txing"
635                                         })
636                                         return err
637                                 }
638                         }
639                 }
640                 ctx.LogI("rx", les, func(les LEs) string {
641                         return fmt.Sprintf(
642                                 "Got transitional packet from %s to %s (%s)",
643                                 sender.Name,
644                                 ctx.NodeName(&nodeId),
645                                 humanize.IBytes(pktSize),
646                         )
647                 })
648                 if !dryRun && jobPath != "" {
649                         if doSeen {
650                                 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
651                                         return err
652                                 }
653                                 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
654                                         fd.Close()
655                                         if err = DirSync(filepath.Dir(jobPath)); err != nil {
656                                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
657                                                         return fmt.Sprintf(
658                                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
659                                                                 sender.Name, pktName,
660                                                                 humanize.IBytes(pktSize),
661                                                                 filepath.Base(jobPath),
662                                                         )
663                                                 })
664                                                 return err
665                                         }
666                                 }
667                         }
668                         if err = os.Remove(jobPath); err != nil {
669                                 ctx.LogE("rx", les, err, func(les LEs) string {
670                                         return fmt.Sprintf(
671                                                 "Tossing trns %s/%s (%s): %s: removing",
672                                                 sender.Name, pktName,
673                                                 humanize.IBytes(pktSize),
674                                                 ctx.NodeName(&nodeId),
675                                         )
676                                 })
677                                 return err
678                         } else if ctx.HdrUsage {
679                                 os.Remove(JobPath2Hdr(jobPath))
680                         }
681                 }
682
683         case PktTypeArea:
684                 if noArea {
685                         return nil
686                 }
687                 areaId := new(AreaId)
688                 copy(areaId[:], pkt.Path[:int(pkt.PathLen)])
689                 les := append(les, LE{"Type", "area"}, LE{"Area", areaId})
690                 logMsg := func(les LEs) string {
691                         return fmt.Sprintf(
692                                 "Tossing %s/%s (%s): area %s",
693                                 sender.Name, pktName,
694                                 humanize.IBytes(pktSize),
695                                 ctx.AreaName(areaId),
696                         )
697                 }
698                 area := ctx.AreaId2Area[*areaId]
699                 if area == nil {
700                         err = errors.New("unknown area")
701                         ctx.LogE("rx-area-unknown", les, err, logMsg)
702                         return err
703                 }
704                 pktEnc, pktEncRaw, err := ctx.HdrRead(pipeR)
705                 fullPipeR := io.MultiReader(bytes.NewReader(pktEncRaw), pipeR)
706                 if err != nil {
707                         ctx.LogE("rx-area-pkt-enc-read", les, err, logMsg)
708                         return err
709                 }
710                 msgHashRaw := blake2b.Sum256(pktEncRaw)
711                 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
712                 les = append(les, LE{"AreaMsg", msgHash})
713                 ctx.LogD("rx-area", les, logMsg)
714
715                 if dryRun {
716                         for _, nodeId := range area.Subs {
717                                 node := ctx.Neigh[*nodeId]
718                                 lesEcho := append(les, LE{"Echo", nodeId})
719                                 seenDir := filepath.Join(
720                                         ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
721                                 )
722                                 seenPath := filepath.Join(seenDir, msgHash)
723                                 logMsgNode := func(les LEs) string {
724                                         return fmt.Sprintf(
725                                                 "%s: echoing to: %s", logMsg(les), node.Name,
726                                         )
727                                 }
728                                 if _, err := os.Stat(seenPath); err == nil {
729                                         ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
730                                                 return logMsgNode(les) + ": already sent"
731                                         })
732                                         continue
733                                 }
734                                 ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
735                         }
736                 } else {
737                         for _, nodeId := range area.Subs {
738                                 node := ctx.Neigh[*nodeId]
739                                 lesEcho := append(les, LE{"Echo", nodeId})
740                                 seenDir := filepath.Join(
741                                         ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
742                                 )
743                                 seenPath := filepath.Join(seenDir, msgHash)
744                                 logMsgNode := func(les LEs) string {
745                                         return fmt.Sprintf("%s: echo to: %s", logMsg(les), node.Name)
746                                 }
747                                 if _, err := os.Stat(seenPath); err == nil {
748                                         ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
749                                                 return logMsgNode(les) + ": already sent"
750                                         })
751                                         continue
752                                 }
753                                 if nodeId != sender.Id && nodeId != pktEnc.Sender {
754                                         ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
755                                         if _, _, err = ctx.Tx(
756                                                 node,
757                                                 &pkt,
758                                                 nice,
759                                                 int64(pktSize), 0, MaxFileSize,
760                                                 fullPipeR,
761                                                 pktName,
762                                                 nil,
763                                         ); err != nil {
764                                                 ctx.LogE("rx-area", lesEcho, err, logMsgNode)
765                                                 return err
766                                         }
767                                 }
768                                 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
769                                         ctx.LogE("rx-area-mkdir", lesEcho, err, logMsgNode)
770                                         return err
771                                 }
772                                 if fd, err := os.Create(seenPath); err == nil {
773                                         fd.Close()
774                                         if err = DirSync(seenDir); err != nil {
775                                                 ctx.LogE("rx-area-dirsync", les, err, logMsgNode)
776                                                 return err
777                                         }
778                                 } else {
779                                         ctx.LogE("rx-area-touch", lesEcho, err, logMsgNode)
780                                         return err
781                                 }
782                                 return JobRepeatProcess
783                         }
784                 }
785
786                 seenDir := filepath.Join(
787                         ctx.Spool, ctx.SelfId.String(), AreaDir, area.Id.String(),
788                 )
789                 seenPath := filepath.Join(seenDir, msgHash)
790                 if _, err := os.Stat(seenPath); err == nil {
791                         ctx.LogD("rx-area-seen", les, func(les LEs) string {
792                                 return logMsg(les) + ": already seen"
793                         })
794                         if !dryRun && jobPath != "" {
795                                 if err = os.Remove(jobPath); err != nil {
796                                         ctx.LogE("rx-area-remove", les, err, func(les LEs) string {
797                                                 return fmt.Sprintf(
798                                                         "Tossing area %s/%s (%s): %s: removing",
799                                                         sender.Name, pktName,
800                                                         humanize.IBytes(pktSize),
801                                                         msgHash,
802                                                 )
803                                         })
804                                         return err
805                                 } else if ctx.HdrUsage {
806                                         os.Remove(JobPath2Hdr(jobPath))
807                                 }
808                         }
809                         return nil
810                 }
811
812                 if area.Prv == nil {
813                         ctx.LogD("rx-area-no-prv", les, func(les LEs) string {
814                                 return logMsg(les) + ": no private key for decoding"
815                         })
816                 } else {
817                         signatureVerify := true
818                         if _, senderKnown := ctx.Neigh[*pktEnc.Sender]; !senderKnown {
819                                 if !area.AllowUnknown {
820                                         err = errors.New("unknown sender")
821                                         ctx.LogE(
822                                                 "rx-area-unknown",
823                                                 append(les, LE{"Sender", pktEnc.Sender}),
824                                                 err,
825                                                 func(les LEs) string {
826                                                         return logMsg(les) + ": sender: " + pktEnc.Sender.String()
827                                                 },
828                                         )
829                                         return err
830                                 }
831                                 signatureVerify = false
832                         }
833                         areaNodeOur := NodeOur{Id: new(NodeId), ExchPrv: new([32]byte)}
834                         copy(areaNodeOur.Id[:], area.Id[:])
835                         copy(areaNodeOur.ExchPrv[:], area.Prv[:])
836                         areaNode := Node{
837                                 Id:       new(NodeId),
838                                 Name:     area.Name,
839                                 Incoming: area.Incoming,
840                                 Exec:     area.Exec,
841                         }
842                         copy(areaNode.Id[:], area.Id[:])
843                         pktName := fmt.Sprintf(
844                                 "area/%s/%s",
845                                 Base32Codec.EncodeToString(areaId[:]), msgHash,
846                         )
847
848                         pipeR, pipeW := io.Pipe()
849                         errs := make(chan error, 1)
850                         go func() {
851                                 errs <- jobProcess(
852                                         ctx,
853                                         pipeR,
854                                         pktName,
855                                         les,
856                                         &areaNode,
857                                         nice,
858                                         uint64(pktSizeWithoutEnc(int64(pktSize))),
859                                         "",
860                                         decompressor,
861                                         dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK,
862                                 )
863                         }()
864                         _, _, _, err = PktEncRead(
865                                 &areaNodeOur,
866                                 ctx.Neigh,
867                                 fullPipeR,
868                                 pipeW,
869                                 signatureVerify,
870                                 nil,
871                         )
872                         if err != nil {
873                                 ctx.LogE("rx-area-pkt-enc-read2", les, err, logMsg)
874                                 pipeW.CloseWithError(err)
875                                 <-errs
876                                 return err
877                         }
878                         pipeW.Close()
879                         if err = <-errs; err != nil {
880                                 return err
881                         }
882                 }
883
884                 if !dryRun && jobPath != "" {
885                         if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
886                                 ctx.LogE("rx-area-mkdir", les, err, logMsg)
887                                 return err
888                         }
889                         if fd, err := os.Create(seenPath); err == nil {
890                                 fd.Close()
891                                 if err = DirSync(seenDir); err != nil {
892                                         ctx.LogE("rx-area-dirsync", les, err, logMsg)
893                                         return err
894                                 }
895                         }
896                         if err = os.Remove(jobPath); err != nil {
897                                 ctx.LogE("rx", les, err, func(les LEs) string {
898                                         return fmt.Sprintf(
899                                                 "Tossing area %s/%s (%s): %s: removing",
900                                                 sender.Name, pktName,
901                                                 humanize.IBytes(pktSize),
902                                                 msgHash,
903                                         )
904                                 })
905                                 return err
906                         } else if ctx.HdrUsage {
907                                 os.Remove(JobPath2Hdr(jobPath))
908                         }
909                 }
910
911         case PktTypeACK:
912                 if noACK {
913                         return nil
914                 }
915                 hsh := Base32Codec.EncodeToString(pkt.Path[:MTHSize])
916                 les := append(les, LE{"Type", "ack"}, LE{"Pkt", hsh})
917                 logMsg := func(les LEs) string {
918                         return fmt.Sprintf("Tossing ack %s/%s: %s", sender.Name, pktName, hsh)
919                 }
920                 ctx.LogD("rx-ack", les, logMsg)
921                 pktPath := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), hsh)
922                 if _, err := os.Stat(pktPath); err == nil {
923                         if !dryRun {
924                                 if err = os.Remove(pktPath); err != nil {
925                                         ctx.LogE("rx-ack", les, err, func(les LEs) string {
926                                                 return logMsg(les) + ": removing packet"
927                                         })
928                                         return err
929                                 } else if ctx.HdrUsage {
930                                         os.Remove(JobPath2Hdr(pktPath))
931                                 }
932                         }
933                 } else {
934                         ctx.LogD("rx-ack", les, func(les LEs) string {
935                                 return logMsg(les) + ": already disappeared"
936                         })
937                 }
938                 if !dryRun && doSeen {
939                         if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
940                                 return err
941                         }
942                         if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
943                                 fd.Close()
944                                 if err = DirSync(filepath.Dir(jobPath)); err != nil {
945                                         ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
946                                                 return fmt.Sprintf(
947                                                         "Tossing file %s/%s (%s): %s: dirsyncing",
948                                                         sender.Name, pktName,
949                                                         humanize.IBytes(pktSize),
950                                                         filepath.Base(jobPath),
951                                                 )
952                                         })
953                                         return err
954                                 }
955                         }
956                 }
957                 if !dryRun {
958                         if err = os.Remove(jobPath); err != nil {
959                                 ctx.LogE("rx", les, err, func(les LEs) string {
960                                         return logMsg(les) + ": removing job"
961                                 })
962                                 return err
963                         } else if ctx.HdrUsage {
964                                 os.Remove(JobPath2Hdr(jobPath))
965                         }
966                 }
967                 ctx.LogI("rx", les, func(les LEs) string {
968                         return fmt.Sprintf("Got ACK packet from %s of %s", sender.Name, hsh)
969                 })
970
971         default:
972                 err = errors.New("unknown type")
973                 ctx.LogE(
974                         "rx-type-unknown", les, err,
975                         func(les LEs) string {
976                                 return fmt.Sprintf(
977                                         "Tossing %s/%s (%s)",
978                                         sender.Name, pktName, humanize.IBytes(pktSize),
979                                 )
980                         },
981                 )
982                 return err
983         }
984         return nil
985 }
986
987 func (ctx *Ctx) Toss(
988         nodeId *NodeId,
989         xx TRxTx,
990         nice uint8,
991         dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool,
992 ) bool {
993         dirLock, err := ctx.LockDir(nodeId, "toss")
994         if err != nil {
995                 return false
996         }
997         defer ctx.UnlockDir(dirLock)
998         isBad := false
999         decompressor, err := zstd.NewReader(nil)
1000         if err != nil {
1001                 panic(err)
1002         }
1003         defer decompressor.Close()
1004         for job := range ctx.Jobs(nodeId, xx) {
1005                 pktName := filepath.Base(job.Path)
1006                 les := LEs{
1007                         {"Node", job.PktEnc.Sender},
1008                         {"Pkt", pktName},
1009                         {"Nice", int(job.PktEnc.Nice)},
1010                 }
1011                 if job.PktEnc.Nice > nice {
1012                         ctx.LogD("rx-too-nice", les, func(les LEs) string {
1013                                 return fmt.Sprintf(
1014                                         "Tossing %s/%s: too nice: %s",
1015                                         ctx.NodeName(job.PktEnc.Sender), pktName,
1016                                         NicenessFmt(job.PktEnc.Nice),
1017                                 )
1018                         })
1019                         continue
1020                 }
1021                 fd, err := os.Open(job.Path)
1022                 if err != nil {
1023                         ctx.LogE("rx-open", les, err, func(les LEs) string {
1024                                 return fmt.Sprintf(
1025                                         "Tossing %s/%s: opening %s",
1026                                         ctx.NodeName(job.PktEnc.Sender), pktName, job.Path,
1027                                 )
1028                         })
1029                         isBad = true
1030                         continue
1031                 }
1032                 sender := ctx.Neigh[*job.PktEnc.Sender]
1033                 if sender == nil {
1034                         err := errors.New("unknown node")
1035                         ctx.LogE("rx-open", les, err, func(les LEs) string {
1036                                 return fmt.Sprintf(
1037                                         "Tossing %s/%s",
1038                                         ctx.NodeName(job.PktEnc.Sender), pktName,
1039                                 )
1040                         })
1041                         isBad = true
1042                         continue
1043                 }
1044                 errs := make(chan error, 1)
1045                 var sharedKey []byte
1046         Retry:
1047                 pipeR, pipeW := io.Pipe()
1048                 go func() {
1049                         errs <- jobProcess(
1050                                 ctx,
1051                                 pipeR,
1052                                 pktName,
1053                                 les,
1054                                 sender,
1055                                 job.PktEnc.Nice,
1056                                 uint64(pktSizeWithoutEnc(job.Size)),
1057                                 job.Path,
1058                                 decompressor,
1059                                 dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK,
1060                         )
1061                 }()
1062                 pipeWB := bufio.NewWriter(pipeW)
1063                 sharedKey, _, _, err = PktEncRead(
1064                         ctx.Self,
1065                         ctx.Neigh,
1066                         bufio.NewReader(fd),
1067                         pipeWB,
1068                         sharedKey == nil,
1069                         sharedKey,
1070                 )
1071                 if err != nil {
1072                         pipeW.CloseWithError(err)
1073                 }
1074                 if err := pipeWB.Flush(); err != nil {
1075                         pipeW.CloseWithError(err)
1076                 }
1077                 pipeW.Close()
1078
1079                 if err != nil {
1080                         isBad = true
1081                         fd.Close()
1082                         <-errs
1083                         continue
1084                 }
1085                 if err = <-errs; err == JobRepeatProcess {
1086                         if _, err = fd.Seek(0, io.SeekStart); err != nil {
1087                                 ctx.LogE("rx-seek", les, err, func(les LEs) string {
1088                                         return fmt.Sprintf(
1089                                                 "Tossing %s/%s: can not seek",
1090                                                 ctx.NodeName(job.PktEnc.Sender),
1091                                                 pktName,
1092                                         )
1093                                 })
1094                                 isBad = true
1095                                 break
1096                         }
1097                         goto Retry
1098                 } else if err != nil {
1099                         isBad = true
1100                 }
1101                 fd.Close()
1102         }
1103         return isBad
1104 }
1105
1106 func (ctx *Ctx) AutoToss(
1107         nodeId *NodeId,
1108         nice uint8,
1109         doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool,
1110 ) (chan struct{}, chan bool) {
1111         dw, err := ctx.NewDirWatcher(
1112                 filepath.Join(ctx.Spool, nodeId.String(), string(TRx)),
1113                 time.Second,
1114         )
1115         if err != nil {
1116                 log.Fatalln(err)
1117         }
1118         finish := make(chan struct{})
1119         badCode := make(chan bool)
1120         go func() {
1121                 bad := false
1122                 for {
1123                         select {
1124                         case <-finish:
1125                                 dw.Close()
1126                                 badCode <- bad
1127                                 return
1128                         case <-dw.C:
1129                                 bad = !ctx.Toss(
1130                                         nodeId, TRx, nice, false,
1131                                         doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK) || bad
1132                         }
1133                 }
1134         }()
1135         return finish, badCode
1136 }