]> Cypherpunks.ru repositories - nncp.git/blob - src/toss.go
Generate ACKs during tossing
[nncp.git] / src / toss.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2023 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bufio"
22         "bytes"
23         "encoding/base64"
24         "errors"
25         "fmt"
26         "io"
27         "io/fs"
28         "log"
29         "mime"
30         "os"
31         "os/exec"
32         "path"
33         "path/filepath"
34         "strconv"
35         "strings"
36         "time"
37
38         xdr "github.com/davecgh/go-xdr/xdr2"
39         "github.com/dustin/go-humanize"
40         "github.com/klauspost/compress/zstd"
41         "golang.org/x/crypto/blake2b"
42         "golang.org/x/crypto/poly1305"
43 )
44
45 const (
46         SeenDir = "seen"
47         ACKDir  = "ack"
48 )
49
50 type TossOpts struct {
51         Nice   uint8
52         DryRun bool
53         DoSeen bool
54         NoFile bool
55         NoFreq bool
56         NoExec bool
57         NoTrns bool
58         NoArea bool
59         NoACK  bool
60         GenACK bool
61 }
62
63 func jobPath2Seen(jobPath string) string {
64         return filepath.Join(filepath.Dir(jobPath), SeenDir, filepath.Base(jobPath))
65 }
66
67 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
68         lines := []string{
69                 "From: " + fromTo.From,
70                 "To: " + fromTo.To,
71                 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
72         }
73         if len(body) > 0 {
74                 lines = append(
75                         lines,
76                         "MIME-Version: 1.0",
77                         "Content-Type: text/plain; charset=utf-8",
78                         "Content-Transfer-Encoding: base64",
79                         "",
80                         base64.StdEncoding.EncodeToString(body),
81                 )
82         }
83         return strings.NewReader(strings.Join(lines, "\n"))
84 }
85
86 func pktSizeWithoutEnc(pktSize int64) int64 {
87         pktSize = pktSize - PktEncOverhead - PktOverhead - PktSizeOverhead
88         pktSizeBlocks := pktSize / (EncBlkSize + poly1305.TagSize)
89         if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
90                 pktSize -= poly1305.TagSize
91         }
92         pktSize -= pktSizeBlocks * poly1305.TagSize
93         return pktSize
94 }
95
96 var JobRepeatProcess = errors.New("needs processing repeat")
97
98 func jobProcess(
99         ctx *Ctx,
100         pipeR *io.PipeReader,
101         pktName string,
102         les LEs,
103         sender *Node,
104         nice uint8,
105         pktSize uint64,
106         jobPath string,
107         decompressor *zstd.Decoder,
108         opts *TossOpts,
109 ) error {
110         defer pipeR.Close()
111         sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
112         var pkt Pkt
113         _, err := xdr.Unmarshal(pipeR, &pkt)
114         if err != nil {
115                 ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
116                         return fmt.Sprintf("Tossing %s/%s: unmarshal", sender.Name, pktName)
117                 })
118                 return err
119         }
120         les = append(les, LE{"Size", int64(pktSize)})
121         ctx.LogD("rx", les, func(les LEs) string {
122                 return fmt.Sprintf(
123                         "Tossing %s/%s (%s)",
124                         sender.Name, pktName,
125                         humanize.IBytes(pktSize),
126                 )
127         })
128         if opts.GenACK && pkt.Type != PktTypeACK {
129                 newPktName, err := ctx.TxACK(
130                         sender, sender.ACKNice, pktName, sender.ACKMinSize,
131                 )
132                 if err != nil {
133                         ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
134                                 return fmt.Sprintf("Tossing %s/%s: generating ACK", sender.Name, pktName)
135                         })
136                         return err
137                 }
138                 ackDir := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), ACKDir)
139                 os.MkdirAll(ackDir, os.FileMode(0777))
140                 if fd, err := os.Create(filepath.Join(ackDir, newPktName)); err == nil {
141                         fd.Close()
142                         if err = DirSync(ackDir); err != nil {
143                                 ctx.LogE("rx-genack", les, err, func(les LEs) string {
144                                         return fmt.Sprintf("Tossing %s/%s: genACK", sender.Name, pktName)
145                                 })
146                                 return err
147                         }
148                 } else {
149                         ctx.LogE("rx-genack", les, err, func(les LEs) string {
150                                 return fmt.Sprintf("Tossing %s/%s: genACK", sender.Name, pktName)
151                         })
152                         return err
153                 }
154         }
155         switch pkt.Type {
156         case PktTypeExec, PktTypeExecFat:
157                 if opts.NoExec {
158                         return nil
159                 }
160                 path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
161                 handle := string(path[0])
162                 args := make([]string, 0, len(path)-1)
163                 for _, p := range path[1:] {
164                         args = append(args, string(p))
165                 }
166                 argsStr := strings.Join(append([]string{handle}, args...), " ")
167                 les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr})
168                 cmdline := sender.Exec[handle]
169                 if len(cmdline) == 0 {
170                         err = errors.New("No handle found")
171                         ctx.LogE(
172                                 "rx-no-handle", les, err,
173                                 func(les LEs) string {
174                                         return fmt.Sprintf(
175                                                 "Tossing exec %s/%s (%s): %s",
176                                                 sender.Name, pktName,
177                                                 humanize.IBytes(pktSize), argsStr,
178                                         )
179                                 },
180                         )
181                         return err
182                 }
183                 if pkt.Type == PktTypeExec {
184                         if err = decompressor.Reset(pipeR); err != nil {
185                                 log.Fatalln(err)
186                         }
187                 }
188                 if !opts.DryRun {
189                         cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...)
190                         cmd.Env = append(
191                                 cmd.Env,
192                                 "NNCP_SELF="+ctx.Self.Id.String(),
193                                 "NNCP_SENDER="+sender.Id.String(),
194                                 "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
195                         )
196                         if pkt.Type == PktTypeExec {
197                                 cmd.Stdin = decompressor
198                         } else {
199                                 cmd.Stdin = pipeR
200                         }
201                         output, err := cmd.CombinedOutput()
202                         if err != nil {
203                                 les = append(les, LE{"Output", strings.Split(
204                                         strings.Trim(string(output), "\n"), "\n"),
205                                 })
206                                 ctx.LogE("rx-handle", les, err, func(les LEs) string {
207                                         return fmt.Sprintf(
208                                                 "Tossing exec %s/%s (%s): %s: handling",
209                                                 sender.Name, pktName,
210                                                 humanize.IBytes(uint64(pktSize)), argsStr,
211                                         )
212                                 })
213                                 return err
214                         }
215                         if len(sendmail) > 0 && ctx.NotifyExec != nil {
216                                 notify := ctx.NotifyExec[sender.Name+"."+handle]
217                                 if notify == nil {
218                                         notify = ctx.NotifyExec["*."+handle]
219                                 }
220                                 if notify != nil {
221                                         cmd := exec.Command(
222                                                 sendmail[0],
223                                                 append(sendmail[1:], notify.To)...,
224                                         )
225                                         cmd.Stdin = newNotification(notify, fmt.Sprintf(
226                                                 "Exec from %s: %s", sender.Name, argsStr,
227                                         ), output)
228                                         if err = cmd.Run(); err != nil {
229                                                 ctx.LogE("rx-notify", les, err, func(les LEs) string {
230                                                         return fmt.Sprintf(
231                                                                 "Tossing exec %s/%s (%s): %s: notifying",
232                                                                 sender.Name, pktName,
233                                                                 humanize.IBytes(pktSize), argsStr,
234                                                         )
235                                                 })
236                                         }
237                                 }
238                         }
239                 }
240                 ctx.LogI("rx", les, func(les LEs) string {
241                         return fmt.Sprintf(
242                                 "Got exec from %s to %s (%s)",
243                                 sender.Name, argsStr,
244                                 humanize.IBytes(pktSize),
245                         )
246                 })
247                 if !opts.DryRun && jobPath != "" {
248                         if opts.DoSeen {
249                                 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
250                                         return err
251                                 }
252                                 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
253                                         fd.Close()
254                                         if err = DirSync(filepath.Dir(jobPath)); err != nil {
255                                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
256                                                         return fmt.Sprintf(
257                                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
258                                                                 sender.Name, pktName,
259                                                                 humanize.IBytes(pktSize),
260                                                                 filepath.Base(jobPath),
261                                                         )
262                                                 })
263                                                 return err
264                                         }
265                                 }
266                         }
267                         if err = os.Remove(jobPath); err != nil {
268                                 ctx.LogE("rx-notify", les, err, func(les LEs) string {
269                                         return fmt.Sprintf(
270                                                 "Tossing exec %s/%s (%s): %s: notifying",
271                                                 sender.Name, pktName,
272                                                 humanize.IBytes(pktSize), argsStr,
273                                         )
274                                 })
275                                 return err
276                         } else if ctx.HdrUsage {
277                                 os.Remove(JobPath2Hdr(jobPath))
278                         }
279                 }
280
281         case PktTypeFile:
282                 if opts.NoFile {
283                         return nil
284                 }
285                 dst := string(pkt.Path[:int(pkt.PathLen)])
286                 les = append(les, LE{"Type", "file"}, LE{"Dst", dst})
287                 if filepath.IsAbs(dst) {
288                         err = errors.New("non-relative destination path")
289                         ctx.LogE(
290                                 "rx-non-rel", les, err,
291                                 func(les LEs) string {
292                                         return fmt.Sprintf(
293                                                 "Tossing file %s/%s (%s): %s",
294                                                 sender.Name, pktName,
295                                                 humanize.IBytes(pktSize), dst,
296                                         )
297                                 },
298                         )
299                         return err
300                 }
301                 incoming := sender.Incoming
302                 if incoming == nil {
303                         err = errors.New("incoming is not allowed")
304                         ctx.LogE(
305                                 "rx-no-incoming", les, err,
306                                 func(les LEs) string {
307                                         return fmt.Sprintf(
308                                                 "Tossing file %s/%s (%s): %s",
309                                                 sender.Name, pktName,
310                                                 humanize.IBytes(pktSize), dst,
311                                         )
312                                 },
313                         )
314                         return err
315                 }
316                 dir := filepath.Join(*incoming, path.Dir(dst))
317                 if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
318                         ctx.LogE("rx-mkdir", les, err, func(les LEs) string {
319                                 return fmt.Sprintf(
320                                         "Tossing file %s/%s (%s): %s: mkdir",
321                                         sender.Name, pktName,
322                                         humanize.IBytes(pktSize), dst,
323                                 )
324                         })
325                         return err
326                 }
327                 if !opts.DryRun {
328                         tmp, err := TempFile(dir, "file")
329                         if err != nil {
330                                 ctx.LogE("rx-mktemp", les, err, func(les LEs) string {
331                                         return fmt.Sprintf(
332                                                 "Tossing file %s/%s (%s): %s: mktemp",
333                                                 sender.Name, pktName,
334                                                 humanize.IBytes(pktSize), dst,
335                                         )
336                                 })
337                                 return err
338                         }
339                         les = append(les, LE{"Tmp", tmp.Name()})
340                         ctx.LogD("rx-tmp-created", les, func(les LEs) string {
341                                 return fmt.Sprintf(
342                                         "Tossing file %s/%s (%s): %s: created: %s",
343                                         sender.Name, pktName,
344                                         humanize.IBytes(pktSize), dst, tmp.Name(),
345                                 )
346                         })
347                         bufW := bufio.NewWriter(tmp)
348                         if _, err = CopyProgressed(
349                                 bufW, pipeR, "Rx file",
350                                 append(les, LE{"FullSize", int64(pktSize)}),
351                                 ctx.ShowPrgrs,
352                         ); err != nil {
353                                 ctx.LogE("rx-copy", les, err, func(les LEs) string {
354                                         return fmt.Sprintf(
355                                                 "Tossing file %s/%s (%s): %s: copying",
356                                                 sender.Name, pktName,
357                                                 humanize.IBytes(pktSize), dst,
358                                         )
359                                 })
360                                 return err
361                         }
362                         if err = bufW.Flush(); err != nil {
363                                 tmp.Close()
364                                 ctx.LogE("rx-flush", les, err, func(les LEs) string {
365                                         return fmt.Sprintf(
366                                                 "Tossing file %s/%s (%s): %s: flushing",
367                                                 sender.Name, pktName,
368                                                 humanize.IBytes(pktSize), dst,
369                                         )
370                                 })
371                                 return err
372                         }
373                         if !NoSync {
374                                 if err = tmp.Sync(); err != nil {
375                                         tmp.Close()
376                                         ctx.LogE("rx-sync", les, err, func(les LEs) string {
377                                                 return fmt.Sprintf(
378                                                         "Tossing file %s/%s (%s): %s: syncing",
379                                                         sender.Name, pktName,
380                                                         humanize.IBytes(pktSize), dst,
381                                                 )
382                                         })
383                                         return err
384                                 }
385                         }
386                         if err = tmp.Close(); err != nil {
387                                 ctx.LogE("rx-close", les, err, func(les LEs) string {
388                                         return fmt.Sprintf(
389                                                 "Tossing file %s/%s (%s): %s: closing",
390                                                 sender.Name, pktName,
391                                                 humanize.IBytes(pktSize), dst,
392                                         )
393                                 })
394                                 return err
395                         }
396                         dstPathOrig := filepath.Join(*incoming, dst)
397                         dstPath := dstPathOrig
398                         dstPathCtr := 0
399                         for {
400                                 if _, err = os.Stat(dstPath); err != nil {
401                                         if errors.Is(err, fs.ErrNotExist) {
402                                                 break
403                                         }
404                                         ctx.LogE("rx-stat", les, err, func(les LEs) string {
405                                                 return fmt.Sprintf(
406                                                         "Tossing file %s/%s (%s): %s: stating: %s",
407                                                         sender.Name, pktName,
408                                                         humanize.IBytes(pktSize), dst, dstPath,
409                                                 )
410                                         })
411                                         return err
412                                 }
413                                 dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
414                                 dstPathCtr++
415                         }
416                         if err = os.Rename(tmp.Name(), dstPath); err != nil {
417                                 ctx.LogE("rx-rename", les, err, func(les LEs) string {
418                                         return fmt.Sprintf(
419                                                 "Tossing file %s/%s (%s): %s: renaming",
420                                                 sender.Name, pktName,
421                                                 humanize.IBytes(pktSize), dst,
422                                         )
423                                 })
424                                 return err
425                         }
426                         if err = DirSync(*incoming); err != nil {
427                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
428                                         return fmt.Sprintf(
429                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
430                                                 sender.Name, pktName,
431                                                 humanize.IBytes(pktSize), dst,
432                                         )
433                                 })
434                                 return err
435                         }
436                         les = les[:len(les)-1] // delete Tmp
437                 }
438                 ctx.LogI("rx", les, func(les LEs) string {
439                         return fmt.Sprintf(
440                                 "Got file %s (%s) from %s",
441                                 dst, humanize.IBytes(pktSize), sender.Name,
442                         )
443                 })
444                 if !opts.DryRun {
445                         if jobPath != "" {
446                                 if opts.DoSeen {
447                                         if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
448                                                 return err
449                                         }
450                                         if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
451                                                 fd.Close()
452                                                 if err = DirSync(filepath.Dir(jobPath)); err != nil {
453                                                         ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
454                                                                 return fmt.Sprintf(
455                                                                         "Tossing file %s/%s (%s): %s: dirsyncing",
456                                                                         sender.Name, pktName,
457                                                                         humanize.IBytes(pktSize),
458                                                                         filepath.Base(jobPath),
459                                                                 )
460                                                         })
461                                                         return err
462                                                 }
463                                         }
464                                 }
465                                 if err = os.Remove(jobPath); err != nil {
466                                         ctx.LogE("rx-remove", les, err, func(les LEs) string {
467                                                 return fmt.Sprintf(
468                                                         "Tossing file %s/%s (%s): %s: removing",
469                                                         sender.Name, pktName,
470                                                         humanize.IBytes(pktSize), dst,
471                                                 )
472                                         })
473                                         return err
474                                 } else if ctx.HdrUsage {
475                                         os.Remove(JobPath2Hdr(jobPath))
476                                 }
477                         }
478                         if len(sendmail) > 0 && ctx.NotifyFile != nil {
479                                 cmd := exec.Command(
480                                         sendmail[0],
481                                         append(sendmail[1:], ctx.NotifyFile.To)...,
482                                 )
483                                 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
484                                         "File from %s: %s (%s)",
485                                         sender.Name, dst, humanize.IBytes(pktSize),
486                                 ), nil)
487                                 if err = cmd.Run(); err != nil {
488                                         ctx.LogE("rx-notify", les, err, func(les LEs) string {
489                                                 return fmt.Sprintf(
490                                                         "Tossing file %s/%s (%s): %s: notifying",
491                                                         sender.Name, pktName,
492                                                         humanize.IBytes(pktSize), dst,
493                                                 )
494                                         })
495                                 }
496                         }
497                 }
498
499         case PktTypeFreq:
500                 if opts.NoFreq {
501                         return nil
502                 }
503                 src := string(pkt.Path[:int(pkt.PathLen)])
504                 les := append(les, LE{"Type", "freq"}, LE{"Src", src})
505                 if filepath.IsAbs(src) {
506                         err = errors.New("non-relative source path")
507                         ctx.LogE(
508                                 "rx-non-rel", les, err,
509                                 func(les LEs) string {
510                                         return fmt.Sprintf(
511                                                 "Tossing freq %s/%s (%s): %s: notifying",
512                                                 sender.Name, pktName,
513                                                 humanize.IBytes(pktSize), src,
514                                         )
515                                 },
516                         )
517                         return err
518                 }
519                 dstRaw, err := io.ReadAll(pipeR)
520                 if err != nil {
521                         ctx.LogE("rx-read", les, err, func(les LEs) string {
522                                 return fmt.Sprintf(
523                                         "Tossing freq %s/%s (%s): %s: reading",
524                                         sender.Name, pktName,
525                                         humanize.IBytes(pktSize), src,
526                                 )
527                         })
528                         return err
529                 }
530                 dst := string(dstRaw)
531                 les = append(les, LE{"Dst", dst})
532                 freqPath := sender.FreqPath
533                 if freqPath == nil {
534                         err = errors.New("freqing is not allowed")
535                         ctx.LogE(
536                                 "rx-no-freq", les, err,
537                                 func(les LEs) string {
538                                         return fmt.Sprintf(
539                                                 "Tossing freq %s/%s (%s): %s -> %s",
540                                                 sender.Name, pktName,
541                                                 humanize.IBytes(pktSize), src, dst,
542                                         )
543                                 },
544                         )
545                         return err
546                 }
547                 if !opts.DryRun {
548                         err = ctx.TxFile(
549                                 sender,
550                                 pkt.Nice,
551                                 filepath.Join(*freqPath, src),
552                                 dst,
553                                 sender.FreqChunked,
554                                 sender.FreqMinSize,
555                                 sender.FreqMaxSize,
556                                 nil,
557                         )
558                         if err != nil {
559                                 ctx.LogE("rx-tx", les, err, func(les LEs) string {
560                                         return fmt.Sprintf(
561                                                 "Tossing freq %s/%s (%s): %s -> %s: txing",
562                                                 sender.Name, pktName,
563                                                 humanize.IBytes(pktSize), src, dst,
564                                         )
565                                 })
566                                 return err
567                         }
568                 }
569                 ctx.LogI("rx", les, func(les LEs) string {
570                         return fmt.Sprintf("Got file request %s to %s", src, sender.Name)
571                 })
572                 if !opts.DryRun {
573                         if jobPath != "" {
574                                 if opts.DoSeen {
575                                         if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
576                                                 return err
577                                         }
578                                         if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
579                                                 fd.Close()
580                                                 if err = DirSync(filepath.Dir(jobPath)); err != nil {
581                                                         ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
582                                                                 return fmt.Sprintf(
583                                                                         "Tossing file %s/%s (%s): %s: dirsyncing",
584                                                                         sender.Name, pktName,
585                                                                         humanize.IBytes(pktSize),
586                                                                         filepath.Base(jobPath),
587                                                                 )
588                                                         })
589                                                         return err
590                                                 }
591                                         }
592                                 }
593                                 if err = os.Remove(jobPath); err != nil {
594                                         ctx.LogE("rx-remove", les, err, func(les LEs) string {
595                                                 return fmt.Sprintf(
596                                                         "Tossing freq %s/%s (%s): %s -> %s: removing",
597                                                         sender.Name, pktName,
598                                                         humanize.IBytes(pktSize), src, dst,
599                                                 )
600                                         })
601                                         return err
602                                 } else if ctx.HdrUsage {
603                                         os.Remove(JobPath2Hdr(jobPath))
604                                 }
605                         }
606                         if len(sendmail) > 0 && ctx.NotifyFreq != nil {
607                                 cmd := exec.Command(
608                                         sendmail[0],
609                                         append(sendmail[1:], ctx.NotifyFreq.To)...,
610                                 )
611                                 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
612                                         "Freq from %s: %s", sender.Name, src,
613                                 ), nil)
614                                 if err = cmd.Run(); err != nil {
615                                         ctx.LogE("rx-notify", les, err, func(les LEs) string {
616                                                 return fmt.Sprintf(
617                                                         "Tossing freq %s/%s (%s): %s -> %s: notifying",
618                                                         sender.Name, pktName,
619                                                         humanize.IBytes(pktSize), src, dst,
620                                                 )
621                                         })
622                                 }
623                         }
624                 }
625
626         case PktTypeTrns:
627                 if opts.NoTrns {
628                         return nil
629                 }
630                 dst := new([MTHSize]byte)
631                 copy(dst[:], pkt.Path[:int(pkt.PathLen)])
632                 nodeId := NodeId(*dst)
633                 les := append(les, LE{"Type", "trns"}, LE{"Dst", nodeId})
634                 logMsg := func(les LEs) string {
635                         return fmt.Sprintf(
636                                 "Tossing trns %s/%s (%s): %s",
637                                 sender.Name, pktName,
638                                 humanize.IBytes(pktSize),
639                                 nodeId.String(),
640                         )
641                 }
642                 node := ctx.Neigh[nodeId]
643                 if node == nil {
644                         err = errors.New("unknown node")
645                         ctx.LogE("rx-unknown", les, err, logMsg)
646                         return err
647                 }
648                 ctx.LogD("rx-tx", les, logMsg)
649                 if !opts.DryRun {
650                         if len(node.Via) == 0 {
651                                 if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil {
652                                         ctx.LogE("rx", les, err, func(les LEs) string {
653                                                 return logMsg(les) + ": txing"
654                                         })
655                                         return err
656                                 }
657                         } else {
658                                 via := node.Via[:len(node.Via)-1]
659                                 node = ctx.Neigh[*node.Via[len(node.Via)-1]]
660                                 node = &Node{Id: node.Id, Via: via, ExchPub: node.ExchPub}
661                                 pktTrns, err := NewPkt(PktTypeTrns, 0, nodeId[:])
662                                 if err != nil {
663                                         panic(err)
664                                 }
665                                 if _, _, _, err = ctx.Tx(
666                                         node,
667                                         pktTrns,
668                                         nice,
669                                         int64(pktSize), 0, MaxFileSize,
670                                         pipeR,
671                                         pktName,
672                                         nil,
673                                 ); err != nil {
674                                         ctx.LogE("rx", les, err, func(les LEs) string {
675                                                 return logMsg(les) + ": txing"
676                                         })
677                                         return err
678                                 }
679                         }
680                 }
681                 ctx.LogI("rx", les, func(les LEs) string {
682                         return fmt.Sprintf(
683                                 "Got transitional packet from %s to %s (%s)",
684                                 sender.Name,
685                                 ctx.NodeName(&nodeId),
686                                 humanize.IBytes(pktSize),
687                         )
688                 })
689                 if !opts.DryRun && jobPath != "" {
690                         if opts.DoSeen {
691                                 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
692                                         return err
693                                 }
694                                 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
695                                         fd.Close()
696                                         if err = DirSync(filepath.Dir(jobPath)); err != nil {
697                                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
698                                                         return fmt.Sprintf(
699                                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
700                                                                 sender.Name, pktName,
701                                                                 humanize.IBytes(pktSize),
702                                                                 filepath.Base(jobPath),
703                                                         )
704                                                 })
705                                                 return err
706                                         }
707                                 }
708                         }
709                         if err = os.Remove(jobPath); err != nil {
710                                 ctx.LogE("rx", les, err, func(les LEs) string {
711                                         return fmt.Sprintf(
712                                                 "Tossing trns %s/%s (%s): %s: removing",
713                                                 sender.Name, pktName,
714                                                 humanize.IBytes(pktSize),
715                                                 ctx.NodeName(&nodeId),
716                                         )
717                                 })
718                                 return err
719                         } else if ctx.HdrUsage {
720                                 os.Remove(JobPath2Hdr(jobPath))
721                         }
722                 }
723
724         case PktTypeArea:
725                 if opts.NoArea {
726                         return nil
727                 }
728                 areaId := new(AreaId)
729                 copy(areaId[:], pkt.Path[:int(pkt.PathLen)])
730                 les := append(les, LE{"Type", "area"}, LE{"Area", areaId})
731                 logMsg := func(les LEs) string {
732                         return fmt.Sprintf(
733                                 "Tossing %s/%s (%s): area %s",
734                                 sender.Name, pktName,
735                                 humanize.IBytes(pktSize),
736                                 ctx.AreaName(areaId),
737                         )
738                 }
739                 area := ctx.AreaId2Area[*areaId]
740                 if area == nil {
741                         err = errors.New("unknown area")
742                         ctx.LogE("rx-area-unknown", les, err, logMsg)
743                         return err
744                 }
745                 pktEnc, pktEncRaw, err := ctx.HdrRead(pipeR)
746                 fullPipeR := io.MultiReader(bytes.NewReader(pktEncRaw), pipeR)
747                 if err != nil {
748                         ctx.LogE("rx-area-pkt-enc-read", les, err, logMsg)
749                         return err
750                 }
751                 msgHashRaw := blake2b.Sum256(pktEncRaw)
752                 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
753                 les = append(les, LE{"AreaMsg", msgHash})
754                 ctx.LogD("rx-area", les, logMsg)
755
756                 if opts.DryRun {
757                         for _, nodeId := range area.Subs {
758                                 node := ctx.Neigh[*nodeId]
759                                 lesEcho := append(les, LE{"Echo", nodeId})
760                                 seenDir := filepath.Join(
761                                         ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
762                                 )
763                                 seenPath := filepath.Join(seenDir, msgHash)
764                                 logMsgNode := func(les LEs) string {
765                                         return fmt.Sprintf(
766                                                 "%s: echoing to: %s", logMsg(les), node.Name,
767                                         )
768                                 }
769                                 if _, err := os.Stat(seenPath); err == nil {
770                                         ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
771                                                 return logMsgNode(les) + ": already sent"
772                                         })
773                                         continue
774                                 }
775                                 ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
776                         }
777                 } else {
778                         for _, nodeId := range area.Subs {
779                                 node := ctx.Neigh[*nodeId]
780                                 lesEcho := append(les, LE{"Echo", nodeId})
781                                 seenDir := filepath.Join(
782                                         ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
783                                 )
784                                 seenPath := filepath.Join(seenDir, msgHash)
785                                 logMsgNode := func(les LEs) string {
786                                         return fmt.Sprintf("%s: echo to: %s", logMsg(les), node.Name)
787                                 }
788                                 if _, err := os.Stat(seenPath); err == nil {
789                                         ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
790                                                 return logMsgNode(les) + ": already sent"
791                                         })
792                                         continue
793                                 }
794                                 if nodeId != sender.Id && nodeId != pktEnc.Sender {
795                                         ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
796                                         if _, _, _, err = ctx.Tx(
797                                                 node,
798                                                 &pkt,
799                                                 nice,
800                                                 int64(pktSize), 0, MaxFileSize,
801                                                 fullPipeR,
802                                                 pktName,
803                                                 nil,
804                                         ); err != nil {
805                                                 ctx.LogE("rx-area", lesEcho, err, logMsgNode)
806                                                 return err
807                                         }
808                                 }
809                                 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
810                                         ctx.LogE("rx-area-mkdir", lesEcho, err, logMsgNode)
811                                         return err
812                                 }
813                                 if fd, err := os.Create(seenPath); err == nil {
814                                         fd.Close()
815                                         if err = DirSync(seenDir); err != nil {
816                                                 ctx.LogE("rx-area-dirsync", les, err, logMsgNode)
817                                                 return err
818                                         }
819                                 } else {
820                                         ctx.LogE("rx-area-touch", lesEcho, err, logMsgNode)
821                                         return err
822                                 }
823                                 return JobRepeatProcess
824                         }
825                 }
826
827                 seenDir := filepath.Join(
828                         ctx.Spool, ctx.SelfId.String(), AreaDir, area.Id.String(),
829                 )
830                 seenPath := filepath.Join(seenDir, msgHash)
831                 if _, err := os.Stat(seenPath); err == nil {
832                         ctx.LogD("rx-area-seen", les, func(les LEs) string {
833                                 return logMsg(les) + ": already seen"
834                         })
835                         if !opts.DryRun && jobPath != "" {
836                                 if err = os.Remove(jobPath); err != nil {
837                                         ctx.LogE("rx-area-remove", les, err, func(les LEs) string {
838                                                 return fmt.Sprintf(
839                                                         "Tossing area %s/%s (%s): %s: removing",
840                                                         sender.Name, pktName,
841                                                         humanize.IBytes(pktSize),
842                                                         msgHash,
843                                                 )
844                                         })
845                                         return err
846                                 } else if ctx.HdrUsage {
847                                         os.Remove(JobPath2Hdr(jobPath))
848                                 }
849                         }
850                         return nil
851                 }
852
853                 if area.Prv == nil {
854                         ctx.LogD("rx-area-no-prv", les, func(les LEs) string {
855                                 return logMsg(les) + ": no private key for decoding"
856                         })
857                 } else {
858                         signatureVerify := true
859                         if _, senderKnown := ctx.Neigh[*pktEnc.Sender]; !senderKnown {
860                                 if !area.AllowUnknown {
861                                         err = errors.New("unknown sender")
862                                         ctx.LogE(
863                                                 "rx-area-unknown",
864                                                 append(les, LE{"Sender", pktEnc.Sender}),
865                                                 err,
866                                                 func(les LEs) string {
867                                                         return logMsg(les) + ": sender: " + pktEnc.Sender.String()
868                                                 },
869                                         )
870                                         return err
871                                 }
872                                 signatureVerify = false
873                         }
874                         areaNodeOur := NodeOur{Id: new(NodeId), ExchPrv: new([32]byte)}
875                         copy(areaNodeOur.Id[:], area.Id[:])
876                         copy(areaNodeOur.ExchPrv[:], area.Prv[:])
877                         areaNode := Node{
878                                 Id:       new(NodeId),
879                                 Name:     area.Name,
880                                 Incoming: area.Incoming,
881                                 Exec:     area.Exec,
882                         }
883                         copy(areaNode.Id[:], area.Id[:])
884                         pktName := fmt.Sprintf(
885                                 "area/%s/%s",
886                                 Base32Codec.EncodeToString(areaId[:]), msgHash,
887                         )
888
889                         pipeR, pipeW := io.Pipe()
890                         errs := make(chan error, 1)
891                         go func() {
892                                 errs <- jobProcess(
893                                         ctx,
894                                         pipeR,
895                                         pktName,
896                                         les,
897                                         &areaNode,
898                                         nice,
899                                         uint64(pktSizeWithoutEnc(int64(pktSize))),
900                                         "",
901                                         decompressor,
902                                         opts,
903                                 )
904                         }()
905                         _, _, _, err = PktEncRead(
906                                 &areaNodeOur,
907                                 ctx.Neigh,
908                                 fullPipeR,
909                                 pipeW,
910                                 signatureVerify,
911                                 nil,
912                         )
913                         if err != nil {
914                                 ctx.LogE("rx-area-pkt-enc-read2", les, err, logMsg)
915                                 pipeW.CloseWithError(err)
916                                 <-errs
917                                 return err
918                         }
919                         pipeW.Close()
920                         if err = <-errs; err != nil {
921                                 return err
922                         }
923                 }
924
925                 if !opts.DryRun && jobPath != "" {
926                         if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
927                                 ctx.LogE("rx-area-mkdir", les, err, logMsg)
928                                 return err
929                         }
930                         if fd, err := os.Create(seenPath); err == nil {
931                                 fd.Close()
932                                 if err = DirSync(seenDir); err != nil {
933                                         ctx.LogE("rx-area-dirsync", les, err, logMsg)
934                                         return err
935                                 }
936                         }
937                         if err = os.Remove(jobPath); err != nil {
938                                 ctx.LogE("rx", les, err, func(les LEs) string {
939                                         return fmt.Sprintf(
940                                                 "Tossing area %s/%s (%s): %s: removing",
941                                                 sender.Name, pktName,
942                                                 humanize.IBytes(pktSize),
943                                                 msgHash,
944                                         )
945                                 })
946                                 return err
947                         } else if ctx.HdrUsage {
948                                 os.Remove(JobPath2Hdr(jobPath))
949                         }
950                 }
951
952         case PktTypeACK:
953                 if opts.NoACK {
954                         return nil
955                 }
956                 hsh := Base32Codec.EncodeToString(pkt.Path[:MTHSize])
957                 les := append(les, LE{"Type", "ack"}, LE{"Pkt", hsh})
958                 logMsg := func(les LEs) string {
959                         return fmt.Sprintf("Tossing ack %s/%s: %s", sender.Name, pktName, hsh)
960                 }
961                 ctx.LogD("rx-ack", les, logMsg)
962                 pktPath := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), hsh)
963                 if _, err := os.Stat(pktPath); err == nil {
964                         if !opts.DryRun {
965                                 if err = os.Remove(pktPath); err != nil {
966                                         ctx.LogE("rx-ack", les, err, func(les LEs) string {
967                                                 return logMsg(les) + ": removing packet"
968                                         })
969                                         return err
970                                 } else if ctx.HdrUsage {
971                                         os.Remove(JobPath2Hdr(pktPath))
972                                 }
973                         }
974                 } else {
975                         ctx.LogD("rx-ack", les, func(les LEs) string {
976                                 return logMsg(les) + ": already disappeared"
977                         })
978                 }
979                 if !opts.DryRun && opts.DoSeen {
980                         if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
981                                 return err
982                         }
983                         if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
984                                 fd.Close()
985                                 if err = DirSync(filepath.Dir(jobPath)); err != nil {
986                                         ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
987                                                 return fmt.Sprintf(
988                                                         "Tossing file %s/%s (%s): %s: dirsyncing",
989                                                         sender.Name, pktName,
990                                                         humanize.IBytes(pktSize),
991                                                         filepath.Base(jobPath),
992                                                 )
993                                         })
994                                         return err
995                                 }
996                         }
997                 }
998                 if !opts.DryRun {
999                         if err = os.Remove(jobPath); err != nil {
1000                                 ctx.LogE("rx", les, err, func(les LEs) string {
1001                                         return logMsg(les) + ": removing job"
1002                                 })
1003                                 return err
1004                         } else if ctx.HdrUsage {
1005                                 os.Remove(JobPath2Hdr(jobPath))
1006                         }
1007                 }
1008                 ctx.LogI("rx", les, func(les LEs) string {
1009                         return fmt.Sprintf("Got ACK packet from %s of %s", sender.Name, hsh)
1010                 })
1011
1012         default:
1013                 err = errors.New("unknown type")
1014                 ctx.LogE(
1015                         "rx-type-unknown", les, err,
1016                         func(les LEs) string {
1017                                 return fmt.Sprintf(
1018                                         "Tossing %s/%s (%s)",
1019                                         sender.Name, pktName, humanize.IBytes(pktSize),
1020                                 )
1021                         },
1022                 )
1023                 return err
1024         }
1025         return nil
1026 }
1027
1028 func (ctx *Ctx) Toss(nodeId *NodeId, xx TRxTx, opts *TossOpts) bool {
1029         dirLock, err := ctx.LockDir(nodeId, "toss")
1030         if err != nil {
1031                 return false
1032         }
1033         defer ctx.UnlockDir(dirLock)
1034         isBad := false
1035         decompressor, err := zstd.NewReader(nil)
1036         if err != nil {
1037                 panic(err)
1038         }
1039         defer decompressor.Close()
1040         for job := range ctx.Jobs(nodeId, xx) {
1041                 pktName := filepath.Base(job.Path)
1042                 les := LEs{
1043                         {"Node", job.PktEnc.Sender},
1044                         {"Pkt", pktName},
1045                         {"Nice", int(job.PktEnc.Nice)},
1046                 }
1047                 if job.PktEnc.Nice > opts.Nice {
1048                         ctx.LogD("rx-too-nice", les, func(les LEs) string {
1049                                 return fmt.Sprintf(
1050                                         "Tossing %s/%s: too nice: %s",
1051                                         ctx.NodeName(job.PktEnc.Sender), pktName,
1052                                         NicenessFmt(job.PktEnc.Nice),
1053                                 )
1054                         })
1055                         continue
1056                 }
1057                 fd, err := os.Open(job.Path)
1058                 if err != nil {
1059                         ctx.LogE("rx-open", les, err, func(les LEs) string {
1060                                 return fmt.Sprintf(
1061                                         "Tossing %s/%s: opening %s",
1062                                         ctx.NodeName(job.PktEnc.Sender), pktName, job.Path,
1063                                 )
1064                         })
1065                         isBad = true
1066                         continue
1067                 }
1068                 sender := ctx.Neigh[*job.PktEnc.Sender]
1069                 if sender == nil {
1070                         err := errors.New("unknown node")
1071                         ctx.LogE("rx-open", les, err, func(les LEs) string {
1072                                 return fmt.Sprintf(
1073                                         "Tossing %s/%s",
1074                                         ctx.NodeName(job.PktEnc.Sender), pktName,
1075                                 )
1076                         })
1077                         isBad = true
1078                         continue
1079                 }
1080                 errs := make(chan error, 1)
1081                 var sharedKey []byte
1082         Retry:
1083                 pipeR, pipeW := io.Pipe()
1084                 go func() {
1085                         errs <- jobProcess(
1086                                 ctx,
1087                                 pipeR,
1088                                 pktName,
1089                                 les,
1090                                 sender,
1091                                 job.PktEnc.Nice,
1092                                 uint64(pktSizeWithoutEnc(job.Size)),
1093                                 job.Path,
1094                                 decompressor,
1095                                 opts,
1096                         )
1097                 }()
1098                 pipeWB := bufio.NewWriter(pipeW)
1099                 sharedKey, _, _, err = PktEncRead(
1100                         ctx.Self,
1101                         ctx.Neigh,
1102                         bufio.NewReaderSize(fd, MTHBlockSize),
1103                         pipeWB,
1104                         sharedKey == nil,
1105                         sharedKey,
1106                 )
1107                 if err != nil {
1108                         pipeW.CloseWithError(err)
1109                 }
1110                 if err := pipeWB.Flush(); err != nil {
1111                         pipeW.CloseWithError(err)
1112                 }
1113                 pipeW.Close()
1114
1115                 if err != nil {
1116                         isBad = true
1117                         fd.Close()
1118                         <-errs
1119                         continue
1120                 }
1121                 if err = <-errs; err == JobRepeatProcess {
1122                         if _, err = fd.Seek(0, io.SeekStart); err != nil {
1123                                 ctx.LogE("rx-seek", les, err, func(les LEs) string {
1124                                         return fmt.Sprintf(
1125                                                 "Tossing %s/%s: can not seek",
1126                                                 ctx.NodeName(job.PktEnc.Sender),
1127                                                 pktName,
1128                                         )
1129                                 })
1130                                 isBad = true
1131                                 break
1132                         }
1133                         goto Retry
1134                 } else if err != nil {
1135                         isBad = true
1136                 }
1137                 fd.Close()
1138         }
1139         return isBad
1140 }
1141
1142 func (ctx *Ctx) AutoToss(nodeId *NodeId, opts *TossOpts) (chan struct{}, chan bool) {
1143         dw, err := ctx.NewDirWatcher(
1144                 filepath.Join(ctx.Spool, nodeId.String(), string(TRx)),
1145                 time.Second,
1146         )
1147         if err != nil {
1148                 log.Fatalln(err)
1149         }
1150         finish := make(chan struct{})
1151         badCode := make(chan bool)
1152         go func() {
1153                 bad := false
1154                 for {
1155                         select {
1156                         case <-finish:
1157                                 dw.Close()
1158                                 badCode <- bad
1159                                 return
1160                         case <-dw.C:
1161                                 bad = !ctx.Toss(nodeId, TRx, opts) || bad
1162                         }
1163                 }
1164         }()
1165         return finish, badCode
1166 }