]> Cypherpunks.ru repositories - nncp.git/blob - src/toss.go
Note about buildability with 1.22
[nncp.git] / src / toss.go
1 // NNCP -- Node to Node copy, utilities for store-and-forward data exchange
2 // Copyright (C) 2016-2024 Sergey Matveev <stargrave@stargrave.org>
3 //
4 // This program is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation, version 3 of the License.
7 //
8 // This program is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 // GNU General Public License for more details.
12 //
13 // You should have received a copy of the GNU General Public License
14 // along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
16 package nncp
17
18 import (
19         "bufio"
20         "bytes"
21         "encoding/base64"
22         "errors"
23         "fmt"
24         "io"
25         "io/fs"
26         "log"
27         "mime"
28         "os"
29         "os/exec"
30         "path"
31         "path/filepath"
32         "strconv"
33         "strings"
34         "time"
35
36         xdr "github.com/davecgh/go-xdr/xdr2"
37         "github.com/dustin/go-humanize"
38         "github.com/klauspost/compress/zstd"
39         "golang.org/x/crypto/blake2b"
40         "golang.org/x/crypto/poly1305"
41 )
42
43 const (
44         SeenDir = "seen"
45         ACKDir  = "ack"
46 )
47
48 type TossOpts struct {
49         Nice   uint8
50         DryRun bool
51         DoSeen bool
52         NoFile bool
53         NoFreq bool
54         NoExec bool
55         NoTrns bool
56         NoArea bool
57         NoACK  bool
58         GenACK bool
59 }
60
61 func jobPath2Seen(jobPath string) string {
62         return filepath.Join(filepath.Dir(jobPath), SeenDir, filepath.Base(jobPath))
63 }
64
65 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
66         lines := []string{
67                 "From: " + fromTo.From,
68                 "To: " + fromTo.To,
69                 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
70         }
71         if len(body) > 0 {
72                 lines = append(
73                         lines,
74                         "MIME-Version: 1.0",
75                         "Content-Type: text/plain; charset=utf-8",
76                         "Content-Transfer-Encoding: base64",
77                         "",
78                         base64.StdEncoding.EncodeToString(body),
79                 )
80         }
81         return strings.NewReader(strings.Join(lines, "\n"))
82 }
83
84 func pktSizeWithoutEnc(pktSize int64) int64 {
85         pktSize = pktSize - PktEncOverhead - PktOverhead - PktSizeOverhead
86         pktSizeBlocks := pktSize / (EncBlkSize + poly1305.TagSize)
87         if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
88                 pktSize -= poly1305.TagSize
89         }
90         pktSize -= pktSizeBlocks * poly1305.TagSize
91         return pktSize
92 }
93
94 var JobRepeatProcess = errors.New("needs processing repeat")
95
96 func jobProcess(
97         ctx *Ctx,
98         pipeR *io.PipeReader,
99         pktName string,
100         les LEs,
101         sender *Node,
102         nice uint8,
103         pktSize uint64,
104         jobPath string,
105         decompressor *zstd.Decoder,
106         opts *TossOpts,
107 ) error {
108         defer pipeR.Close()
109         sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
110         var pkt Pkt
111         _, err := xdr.Unmarshal(pipeR, &pkt)
112         if err != nil {
113                 ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
114                         return fmt.Sprintf("Tossing %s/%s: unmarshal", sender.Name, pktName)
115                 })
116                 return err
117         }
118         les = append(les, LE{"Size", int64(pktSize)})
119         ctx.LogD("rx", les, func(les LEs) string {
120                 return fmt.Sprintf(
121                         "Tossing %s/%s (%s)",
122                         sender.Name, pktName,
123                         humanize.IBytes(pktSize),
124                 )
125         })
126         if opts.GenACK && pkt.Type != PktTypeACK {
127                 newPktName, err := ctx.TxACK(
128                         sender, sender.ACKNice, pktName, sender.ACKMinSize,
129                 )
130                 if err != nil {
131                         ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
132                                 return fmt.Sprintf("Tossing %s/%s: generating ACK", sender.Name, pktName)
133                         })
134                         return err
135                 }
136                 ackDir := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), ACKDir)
137                 os.MkdirAll(ackDir, os.FileMode(0777))
138                 if fd, err := os.Create(filepath.Join(ackDir, newPktName)); err == nil {
139                         fd.Close()
140                         if err = DirSync(ackDir); err != nil {
141                                 ctx.LogE("rx-genack", les, err, func(les LEs) string {
142                                         return fmt.Sprintf("Tossing %s/%s: genACK", sender.Name, pktName)
143                                 })
144                                 return err
145                         }
146                 } else {
147                         ctx.LogE("rx-genack", les, err, func(les LEs) string {
148                                 return fmt.Sprintf("Tossing %s/%s: genACK", sender.Name, pktName)
149                         })
150                         return err
151                 }
152         }
153         switch pkt.Type {
154         case PktTypeExec, PktTypeExecFat:
155                 if opts.NoExec {
156                         return nil
157                 }
158                 path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
159                 handle := string(path[0])
160                 args := make([]string, 0, len(path)-1)
161                 for _, p := range path[1:] {
162                         args = append(args, string(p))
163                 }
164                 argsStr := strings.Join(append([]string{handle}, args...), " ")
165                 les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr})
166                 cmdline := sender.Exec[handle]
167                 if len(cmdline) == 0 {
168                         err = errors.New("No handle found")
169                         ctx.LogE(
170                                 "rx-no-handle", les, err,
171                                 func(les LEs) string {
172                                         return fmt.Sprintf(
173                                                 "Tossing exec %s/%s (%s): %s",
174                                                 sender.Name, pktName,
175                                                 humanize.IBytes(pktSize), argsStr,
176                                         )
177                                 },
178                         )
179                         return err
180                 }
181                 if pkt.Type == PktTypeExec {
182                         if err = decompressor.Reset(pipeR); err != nil {
183                                 log.Fatalln(err)
184                         }
185                 }
186                 if !opts.DryRun {
187                         cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...)
188                         cmd.Env = append(
189                                 cmd.Env,
190                                 "NNCP_SELF="+ctx.Self.Id.String(),
191                                 "NNCP_SENDER="+sender.Id.String(),
192                                 "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
193                         )
194                         if pkt.Type == PktTypeExec {
195                                 cmd.Stdin = decompressor
196                         } else {
197                                 cmd.Stdin = pipeR
198                         }
199                         output, err := cmd.CombinedOutput()
200                         if err != nil {
201                                 les = append(les, LE{"Output", strings.Split(
202                                         strings.Trim(string(output), "\n"), "\n"),
203                                 })
204                                 ctx.LogE("rx-handle", les, err, func(les LEs) string {
205                                         return fmt.Sprintf(
206                                                 "Tossing exec %s/%s (%s): %s: handling",
207                                                 sender.Name, pktName,
208                                                 humanize.IBytes(uint64(pktSize)), argsStr,
209                                         )
210                                 })
211                                 return err
212                         }
213                         if len(sendmail) > 0 && ctx.NotifyExec != nil {
214                                 notify := ctx.NotifyExec[sender.Name+"."+handle]
215                                 if notify == nil {
216                                         notify = ctx.NotifyExec["*."+handle]
217                                 }
218                                 if notify != nil {
219                                         cmd := exec.Command(
220                                                 sendmail[0],
221                                                 append(sendmail[1:], notify.To)...,
222                                         )
223                                         cmd.Stdin = newNotification(notify, fmt.Sprintf(
224                                                 "Exec from %s: %s", sender.Name, argsStr,
225                                         ), output)
226                                         if err = cmd.Run(); err != nil {
227                                                 ctx.LogE("rx-notify", les, err, func(les LEs) string {
228                                                         return fmt.Sprintf(
229                                                                 "Tossing exec %s/%s (%s): %s: notifying",
230                                                                 sender.Name, pktName,
231                                                                 humanize.IBytes(pktSize), argsStr,
232                                                         )
233                                                 })
234                                         }
235                                 }
236                         }
237                 }
238                 ctx.LogI("rx", les, func(les LEs) string {
239                         return fmt.Sprintf(
240                                 "Got exec from %s to %s (%s)",
241                                 sender.Name, argsStr,
242                                 humanize.IBytes(pktSize),
243                         )
244                 })
245                 if !opts.DryRun && jobPath != "" {
246                         if opts.DoSeen {
247                                 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
248                                         return err
249                                 }
250                                 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
251                                         fd.Close()
252                                         if err = DirSync(filepath.Dir(jobPath)); err != nil {
253                                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
254                                                         return fmt.Sprintf(
255                                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
256                                                                 sender.Name, pktName,
257                                                                 humanize.IBytes(pktSize),
258                                                                 filepath.Base(jobPath),
259                                                         )
260                                                 })
261                                                 return err
262                                         }
263                                 }
264                         }
265                         if err = os.Remove(jobPath); err != nil {
266                                 ctx.LogE("rx-notify", les, err, func(les LEs) string {
267                                         return fmt.Sprintf(
268                                                 "Tossing exec %s/%s (%s): %s: notifying",
269                                                 sender.Name, pktName,
270                                                 humanize.IBytes(pktSize), argsStr,
271                                         )
272                                 })
273                                 return err
274                         } else if ctx.HdrUsage {
275                                 os.Remove(JobPath2Hdr(jobPath))
276                         }
277                 }
278
279         case PktTypeFile:
280                 if opts.NoFile {
281                         return nil
282                 }
283                 dst := string(pkt.Path[:int(pkt.PathLen)])
284                 les = append(les, LE{"Type", "file"}, LE{"Dst", dst})
285                 if filepath.IsAbs(dst) {
286                         err = errors.New("non-relative destination path")
287                         ctx.LogE(
288                                 "rx-non-rel", les, err,
289                                 func(les LEs) string {
290                                         return fmt.Sprintf(
291                                                 "Tossing file %s/%s (%s): %s",
292                                                 sender.Name, pktName,
293                                                 humanize.IBytes(pktSize), dst,
294                                         )
295                                 },
296                         )
297                         return err
298                 }
299                 incoming := sender.Incoming
300                 if incoming == nil {
301                         err = errors.New("incoming is not allowed")
302                         ctx.LogE(
303                                 "rx-no-incoming", les, err,
304                                 func(les LEs) string {
305                                         return fmt.Sprintf(
306                                                 "Tossing file %s/%s (%s): %s",
307                                                 sender.Name, pktName,
308                                                 humanize.IBytes(pktSize), dst,
309                                         )
310                                 },
311                         )
312                         return err
313                 }
314                 dir := filepath.Join(*incoming, path.Dir(dst))
315                 if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
316                         ctx.LogE("rx-mkdir", les, err, func(les LEs) string {
317                                 return fmt.Sprintf(
318                                         "Tossing file %s/%s (%s): %s: mkdir",
319                                         sender.Name, pktName,
320                                         humanize.IBytes(pktSize), dst,
321                                 )
322                         })
323                         return err
324                 }
325                 if !opts.DryRun {
326                         tmp, err := TempFile(dir, "file")
327                         if err != nil {
328                                 ctx.LogE("rx-mktemp", les, err, func(les LEs) string {
329                                         return fmt.Sprintf(
330                                                 "Tossing file %s/%s (%s): %s: mktemp",
331                                                 sender.Name, pktName,
332                                                 humanize.IBytes(pktSize), dst,
333                                         )
334                                 })
335                                 return err
336                         }
337                         les = append(les, LE{"Tmp", tmp.Name()})
338                         ctx.LogD("rx-tmp-created", les, func(les LEs) string {
339                                 return fmt.Sprintf(
340                                         "Tossing file %s/%s (%s): %s: created: %s",
341                                         sender.Name, pktName,
342                                         humanize.IBytes(pktSize), dst, tmp.Name(),
343                                 )
344                         })
345                         bufW := bufio.NewWriter(tmp)
346                         if _, err = CopyProgressed(
347                                 bufW, pipeR, "Rx file",
348                                 append(les, LE{"FullSize", int64(pktSize)}),
349                                 ctx.ShowPrgrs,
350                         ); err != nil {
351                                 ctx.LogE("rx-copy", les, err, func(les LEs) string {
352                                         return fmt.Sprintf(
353                                                 "Tossing file %s/%s (%s): %s: copying",
354                                                 sender.Name, pktName,
355                                                 humanize.IBytes(pktSize), dst,
356                                         )
357                                 })
358                                 return err
359                         }
360                         if err = bufW.Flush(); err != nil {
361                                 tmp.Close()
362                                 ctx.LogE("rx-flush", les, err, func(les LEs) string {
363                                         return fmt.Sprintf(
364                                                 "Tossing file %s/%s (%s): %s: flushing",
365                                                 sender.Name, pktName,
366                                                 humanize.IBytes(pktSize), dst,
367                                         )
368                                 })
369                                 return err
370                         }
371                         if !NoSync {
372                                 if err = tmp.Sync(); err != nil {
373                                         tmp.Close()
374                                         ctx.LogE("rx-sync", les, err, func(les LEs) string {
375                                                 return fmt.Sprintf(
376                                                         "Tossing file %s/%s (%s): %s: syncing",
377                                                         sender.Name, pktName,
378                                                         humanize.IBytes(pktSize), dst,
379                                                 )
380                                         })
381                                         return err
382                                 }
383                         }
384                         if err = tmp.Close(); err != nil {
385                                 ctx.LogE("rx-close", les, err, func(les LEs) string {
386                                         return fmt.Sprintf(
387                                                 "Tossing file %s/%s (%s): %s: closing",
388                                                 sender.Name, pktName,
389                                                 humanize.IBytes(pktSize), dst,
390                                         )
391                                 })
392                                 return err
393                         }
394                         dstPathOrig := filepath.Join(*incoming, dst)
395                         dstPath := dstPathOrig
396                         dstPathCtr := 0
397                         for {
398                                 if _, err = os.Stat(dstPath); err != nil {
399                                         if errors.Is(err, fs.ErrNotExist) {
400                                                 break
401                                         }
402                                         ctx.LogE("rx-stat", les, err, func(les LEs) string {
403                                                 return fmt.Sprintf(
404                                                         "Tossing file %s/%s (%s): %s: stating: %s",
405                                                         sender.Name, pktName,
406                                                         humanize.IBytes(pktSize), dst, dstPath,
407                                                 )
408                                         })
409                                         return err
410                                 }
411                                 dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
412                                 dstPathCtr++
413                         }
414                         if err = os.Rename(tmp.Name(), dstPath); err != nil {
415                                 ctx.LogE("rx-rename", les, err, func(les LEs) string {
416                                         return fmt.Sprintf(
417                                                 "Tossing file %s/%s (%s): %s: renaming",
418                                                 sender.Name, pktName,
419                                                 humanize.IBytes(pktSize), dst,
420                                         )
421                                 })
422                                 return err
423                         }
424                         if err = DirSync(*incoming); err != nil {
425                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
426                                         return fmt.Sprintf(
427                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
428                                                 sender.Name, pktName,
429                                                 humanize.IBytes(pktSize), dst,
430                                         )
431                                 })
432                                 return err
433                         }
434                         les = les[:len(les)-1] // delete Tmp
435                 }
436                 ctx.LogI("rx", les, func(les LEs) string {
437                         return fmt.Sprintf(
438                                 "Got file %s (%s) from %s",
439                                 dst, humanize.IBytes(pktSize), sender.Name,
440                         )
441                 })
442                 if !opts.DryRun {
443                         if jobPath != "" {
444                                 if opts.DoSeen {
445                                         if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
446                                                 return err
447                                         }
448                                         if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
449                                                 fd.Close()
450                                                 if err = DirSync(filepath.Dir(jobPath)); err != nil {
451                                                         ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
452                                                                 return fmt.Sprintf(
453                                                                         "Tossing file %s/%s (%s): %s: dirsyncing",
454                                                                         sender.Name, pktName,
455                                                                         humanize.IBytes(pktSize),
456                                                                         filepath.Base(jobPath),
457                                                                 )
458                                                         })
459                                                         return err
460                                                 }
461                                         }
462                                 }
463                                 if err = os.Remove(jobPath); err != nil {
464                                         ctx.LogE("rx-remove", les, err, func(les LEs) string {
465                                                 return fmt.Sprintf(
466                                                         "Tossing file %s/%s (%s): %s: removing",
467                                                         sender.Name, pktName,
468                                                         humanize.IBytes(pktSize), dst,
469                                                 )
470                                         })
471                                         return err
472                                 } else if ctx.HdrUsage {
473                                         os.Remove(JobPath2Hdr(jobPath))
474                                 }
475                         }
476                         if len(sendmail) > 0 && ctx.NotifyFile != nil {
477                                 cmd := exec.Command(
478                                         sendmail[0],
479                                         append(sendmail[1:], ctx.NotifyFile.To)...,
480                                 )
481                                 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
482                                         "File from %s: %s (%s)",
483                                         sender.Name, dst, humanize.IBytes(pktSize),
484                                 ), nil)
485                                 if err = cmd.Run(); err != nil {
486                                         ctx.LogE("rx-notify", les, err, func(les LEs) string {
487                                                 return fmt.Sprintf(
488                                                         "Tossing file %s/%s (%s): %s: notifying",
489                                                         sender.Name, pktName,
490                                                         humanize.IBytes(pktSize), dst,
491                                                 )
492                                         })
493                                 }
494                         }
495                 }
496
497         case PktTypeFreq:
498                 if opts.NoFreq {
499                         return nil
500                 }
501                 src := string(pkt.Path[:int(pkt.PathLen)])
502                 les := append(les, LE{"Type", "freq"}, LE{"Src", src})
503                 if filepath.IsAbs(src) {
504                         err = errors.New("non-relative source path")
505                         ctx.LogE(
506                                 "rx-non-rel", les, err,
507                                 func(les LEs) string {
508                                         return fmt.Sprintf(
509                                                 "Tossing freq %s/%s (%s): %s: notifying",
510                                                 sender.Name, pktName,
511                                                 humanize.IBytes(pktSize), src,
512                                         )
513                                 },
514                         )
515                         return err
516                 }
517                 dstRaw, err := io.ReadAll(pipeR)
518                 if err != nil {
519                         ctx.LogE("rx-read", les, err, func(les LEs) string {
520                                 return fmt.Sprintf(
521                                         "Tossing freq %s/%s (%s): %s: reading",
522                                         sender.Name, pktName,
523                                         humanize.IBytes(pktSize), src,
524                                 )
525                         })
526                         return err
527                 }
528                 dst := string(dstRaw)
529                 les = append(les, LE{"Dst", dst})
530                 freqPath := sender.FreqPath
531                 if freqPath == nil {
532                         err = errors.New("freqing is not allowed")
533                         ctx.LogE(
534                                 "rx-no-freq", les, err,
535                                 func(les LEs) string {
536                                         return fmt.Sprintf(
537                                                 "Tossing freq %s/%s (%s): %s -> %s",
538                                                 sender.Name, pktName,
539                                                 humanize.IBytes(pktSize), src, dst,
540                                         )
541                                 },
542                         )
543                         return err
544                 }
545                 if !opts.DryRun {
546                         err = ctx.TxFile(
547                                 sender,
548                                 pkt.Nice,
549                                 filepath.Join(*freqPath, src),
550                                 dst,
551                                 sender.FreqChunked,
552                                 sender.FreqMinSize,
553                                 sender.FreqMaxSize,
554                                 nil,
555                         )
556                         if err != nil {
557                                 ctx.LogE("rx-tx", les, err, func(les LEs) string {
558                                         return fmt.Sprintf(
559                                                 "Tossing freq %s/%s (%s): %s -> %s: txing",
560                                                 sender.Name, pktName,
561                                                 humanize.IBytes(pktSize), src, dst,
562                                         )
563                                 })
564                                 return err
565                         }
566                 }
567                 ctx.LogI("rx", les, func(les LEs) string {
568                         return fmt.Sprintf("Got file request %s to %s", src, sender.Name)
569                 })
570                 if !opts.DryRun {
571                         if jobPath != "" {
572                                 if opts.DoSeen {
573                                         if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
574                                                 return err
575                                         }
576                                         if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
577                                                 fd.Close()
578                                                 if err = DirSync(filepath.Dir(jobPath)); err != nil {
579                                                         ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
580                                                                 return fmt.Sprintf(
581                                                                         "Tossing file %s/%s (%s): %s: dirsyncing",
582                                                                         sender.Name, pktName,
583                                                                         humanize.IBytes(pktSize),
584                                                                         filepath.Base(jobPath),
585                                                                 )
586                                                         })
587                                                         return err
588                                                 }
589                                         }
590                                 }
591                                 if err = os.Remove(jobPath); err != nil {
592                                         ctx.LogE("rx-remove", les, err, func(les LEs) string {
593                                                 return fmt.Sprintf(
594                                                         "Tossing freq %s/%s (%s): %s -> %s: removing",
595                                                         sender.Name, pktName,
596                                                         humanize.IBytes(pktSize), src, dst,
597                                                 )
598                                         })
599                                         return err
600                                 } else if ctx.HdrUsage {
601                                         os.Remove(JobPath2Hdr(jobPath))
602                                 }
603                         }
604                         if len(sendmail) > 0 && ctx.NotifyFreq != nil {
605                                 cmd := exec.Command(
606                                         sendmail[0],
607                                         append(sendmail[1:], ctx.NotifyFreq.To)...,
608                                 )
609                                 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
610                                         "Freq from %s: %s", sender.Name, src,
611                                 ), nil)
612                                 if err = cmd.Run(); err != nil {
613                                         ctx.LogE("rx-notify", les, err, func(les LEs) string {
614                                                 return fmt.Sprintf(
615                                                         "Tossing freq %s/%s (%s): %s -> %s: notifying",
616                                                         sender.Name, pktName,
617                                                         humanize.IBytes(pktSize), src, dst,
618                                                 )
619                                         })
620                                 }
621                         }
622                 }
623
624         case PktTypeTrns:
625                 if opts.NoTrns {
626                         return nil
627                 }
628                 dst := new([MTHSize]byte)
629                 copy(dst[:], pkt.Path[:int(pkt.PathLen)])
630                 nodeId := NodeId(*dst)
631                 les := append(les, LE{"Type", "trns"}, LE{"Dst", nodeId})
632                 logMsg := func(les LEs) string {
633                         return fmt.Sprintf(
634                                 "Tossing trns %s/%s (%s): %s",
635                                 sender.Name, pktName,
636                                 humanize.IBytes(pktSize),
637                                 nodeId.String(),
638                         )
639                 }
640                 node := ctx.Neigh[nodeId]
641                 if node == nil {
642                         err = errors.New("unknown node")
643                         ctx.LogE("rx-unknown", les, err, logMsg)
644                         return err
645                 }
646                 ctx.LogD("rx-tx", les, logMsg)
647                 if !opts.DryRun {
648                         if len(node.Via) == 0 {
649                                 if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil {
650                                         ctx.LogE("rx", les, err, func(les LEs) string {
651                                                 return logMsg(les) + ": txing"
652                                         })
653                                         return err
654                                 }
655                         } else {
656                                 via := node.Via[:len(node.Via)-1]
657                                 node = ctx.Neigh[*node.Via[len(node.Via)-1]]
658                                 node = &Node{Id: node.Id, Via: via, ExchPub: node.ExchPub}
659                                 pktTrns, err := NewPkt(PktTypeTrns, 0, nodeId[:])
660                                 if err != nil {
661                                         panic(err)
662                                 }
663                                 if _, _, _, err = ctx.Tx(
664                                         node,
665                                         pktTrns,
666                                         nice,
667                                         int64(pktSize), 0, MaxFileSize,
668                                         pipeR,
669                                         pktName,
670                                         nil,
671                                 ); err != nil {
672                                         ctx.LogE("rx", les, err, func(les LEs) string {
673                                                 return logMsg(les) + ": txing"
674                                         })
675                                         return err
676                                 }
677                         }
678                 }
679                 ctx.LogI("rx", les, func(les LEs) string {
680                         return fmt.Sprintf(
681                                 "Got transitional packet from %s to %s (%s)",
682                                 sender.Name,
683                                 ctx.NodeName(&nodeId),
684                                 humanize.IBytes(pktSize),
685                         )
686                 })
687                 if !opts.DryRun && jobPath != "" {
688                         if opts.DoSeen {
689                                 if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
690                                         return err
691                                 }
692                                 if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
693                                         fd.Close()
694                                         if err = DirSync(filepath.Dir(jobPath)); err != nil {
695                                                 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
696                                                         return fmt.Sprintf(
697                                                                 "Tossing file %s/%s (%s): %s: dirsyncing",
698                                                                 sender.Name, pktName,
699                                                                 humanize.IBytes(pktSize),
700                                                                 filepath.Base(jobPath),
701                                                         )
702                                                 })
703                                                 return err
704                                         }
705                                 }
706                         }
707                         if err = os.Remove(jobPath); err != nil {
708                                 ctx.LogE("rx", les, err, func(les LEs) string {
709                                         return fmt.Sprintf(
710                                                 "Tossing trns %s/%s (%s): %s: removing",
711                                                 sender.Name, pktName,
712                                                 humanize.IBytes(pktSize),
713                                                 ctx.NodeName(&nodeId),
714                                         )
715                                 })
716                                 return err
717                         } else if ctx.HdrUsage {
718                                 os.Remove(JobPath2Hdr(jobPath))
719                         }
720                 }
721
722         case PktTypeArea:
723                 if opts.NoArea {
724                         return nil
725                 }
726                 areaId := new(AreaId)
727                 copy(areaId[:], pkt.Path[:int(pkt.PathLen)])
728                 les := append(les, LE{"Type", "area"}, LE{"Area", areaId})
729                 logMsg := func(les LEs) string {
730                         return fmt.Sprintf(
731                                 "Tossing %s/%s (%s): area %s",
732                                 sender.Name, pktName,
733                                 humanize.IBytes(pktSize),
734                                 ctx.AreaName(areaId),
735                         )
736                 }
737                 area := ctx.AreaId2Area[*areaId]
738                 if area == nil {
739                         err = errors.New("unknown area")
740                         ctx.LogE("rx-area-unknown", les, err, logMsg)
741                         return err
742                 }
743                 pktEnc, pktEncRaw, err := ctx.HdrRead(pipeR)
744                 fullPipeR := io.MultiReader(bytes.NewReader(pktEncRaw), pipeR)
745                 if err != nil {
746                         ctx.LogE("rx-area-pkt-enc-read", les, err, logMsg)
747                         return err
748                 }
749                 msgHashRaw := blake2b.Sum256(pktEncRaw)
750                 msgHash := Base32Codec.EncodeToString(msgHashRaw[:])
751                 les = append(les, LE{"AreaMsg", msgHash})
752                 ctx.LogD("rx-area", les, logMsg)
753
754                 if opts.DryRun {
755                         for _, nodeId := range area.Subs {
756                                 node := ctx.Neigh[*nodeId]
757                                 lesEcho := append(les, LE{"Echo", nodeId})
758                                 seenDir := filepath.Join(
759                                         ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
760                                 )
761                                 seenPath := filepath.Join(seenDir, msgHash)
762                                 logMsgNode := func(les LEs) string {
763                                         return fmt.Sprintf(
764                                                 "%s: echoing to: %s", logMsg(les), node.Name,
765                                         )
766                                 }
767                                 if _, err := os.Stat(seenPath); err == nil {
768                                         ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
769                                                 return logMsgNode(les) + ": already sent"
770                                         })
771                                         continue
772                                 }
773                                 ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
774                         }
775                 } else {
776                         for _, nodeId := range area.Subs {
777                                 node := ctx.Neigh[*nodeId]
778                                 lesEcho := append(les, LE{"Echo", nodeId})
779                                 seenDir := filepath.Join(
780                                         ctx.Spool, nodeId.String(), AreaDir, area.Id.String(),
781                                 )
782                                 seenPath := filepath.Join(seenDir, msgHash)
783                                 logMsgNode := func(les LEs) string {
784                                         return fmt.Sprintf("%s: echo to: %s", logMsg(les), node.Name)
785                                 }
786                                 if _, err := os.Stat(seenPath); err == nil {
787                                         ctx.LogD("rx-area-echo-seen", lesEcho, func(les LEs) string {
788                                                 return logMsgNode(les) + ": already sent"
789                                         })
790                                         continue
791                                 }
792                                 if nodeId != sender.Id && nodeId != pktEnc.Sender {
793                                         ctx.LogI("rx-area-echo", lesEcho, logMsgNode)
794                                         if _, _, _, err = ctx.Tx(
795                                                 node,
796                                                 &pkt,
797                                                 nice,
798                                                 int64(pktSize), 0, MaxFileSize,
799                                                 fullPipeR,
800                                                 pktName,
801                                                 nil,
802                                         ); err != nil {
803                                                 ctx.LogE("rx-area", lesEcho, err, logMsgNode)
804                                                 return err
805                                         }
806                                 }
807                                 if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
808                                         ctx.LogE("rx-area-mkdir", lesEcho, err, logMsgNode)
809                                         return err
810                                 }
811                                 if fd, err := os.Create(seenPath); err == nil {
812                                         fd.Close()
813                                         if err = DirSync(seenDir); err != nil {
814                                                 ctx.LogE("rx-area-dirsync", les, err, logMsgNode)
815                                                 return err
816                                         }
817                                 } else {
818                                         ctx.LogE("rx-area-touch", lesEcho, err, logMsgNode)
819                                         return err
820                                 }
821                                 return JobRepeatProcess
822                         }
823                 }
824
825                 seenDir := filepath.Join(
826                         ctx.Spool, ctx.SelfId.String(), AreaDir, area.Id.String(),
827                 )
828                 seenPath := filepath.Join(seenDir, msgHash)
829                 if _, err := os.Stat(seenPath); err == nil {
830                         ctx.LogD("rx-area-seen", les, func(les LEs) string {
831                                 return logMsg(les) + ": already seen"
832                         })
833                         if !opts.DryRun && jobPath != "" {
834                                 if err = os.Remove(jobPath); err != nil {
835                                         ctx.LogE("rx-area-remove", les, err, func(les LEs) string {
836                                                 return fmt.Sprintf(
837                                                         "Tossing area %s/%s (%s): %s: removing",
838                                                         sender.Name, pktName,
839                                                         humanize.IBytes(pktSize),
840                                                         msgHash,
841                                                 )
842                                         })
843                                         return err
844                                 } else if ctx.HdrUsage {
845                                         os.Remove(JobPath2Hdr(jobPath))
846                                 }
847                         }
848                         return nil
849                 }
850
851                 if area.Prv == nil {
852                         ctx.LogD("rx-area-no-prv", les, func(les LEs) string {
853                                 return logMsg(les) + ": no private key for decoding"
854                         })
855                 } else {
856                         signatureVerify := true
857                         if _, senderKnown := ctx.Neigh[*pktEnc.Sender]; !senderKnown {
858                                 if !area.AllowUnknown {
859                                         err = errors.New("unknown sender")
860                                         ctx.LogE(
861                                                 "rx-area-unknown",
862                                                 append(les, LE{"Sender", pktEnc.Sender}),
863                                                 err,
864                                                 func(les LEs) string {
865                                                         return logMsg(les) + ": sender: " + pktEnc.Sender.String()
866                                                 },
867                                         )
868                                         return err
869                                 }
870                                 signatureVerify = false
871                         }
872                         areaNodeOur := NodeOur{Id: new(NodeId), ExchPrv: new([32]byte)}
873                         copy(areaNodeOur.Id[:], area.Id[:])
874                         copy(areaNodeOur.ExchPrv[:], area.Prv[:])
875                         areaNode := Node{
876                                 Id:       new(NodeId),
877                                 Name:     area.Name,
878                                 Incoming: area.Incoming,
879                                 Exec:     area.Exec,
880                         }
881                         copy(areaNode.Id[:], area.Id[:])
882                         pktName := fmt.Sprintf(
883                                 "area/%s/%s",
884                                 Base32Codec.EncodeToString(areaId[:]), msgHash,
885                         )
886
887                         pipeR, pipeW := io.Pipe()
888                         errs := make(chan error, 1)
889                         go func() {
890                                 errs <- jobProcess(
891                                         ctx,
892                                         pipeR,
893                                         pktName,
894                                         les,
895                                         &areaNode,
896                                         nice,
897                                         uint64(pktSizeWithoutEnc(int64(pktSize))),
898                                         "",
899                                         decompressor,
900                                         opts,
901                                 )
902                         }()
903                         _, _, _, err = PktEncRead(
904                                 &areaNodeOur,
905                                 ctx.Neigh,
906                                 fullPipeR,
907                                 pipeW,
908                                 signatureVerify,
909                                 nil,
910                         )
911                         if err != nil {
912                                 ctx.LogE("rx-area-pkt-enc-read2", les, err, logMsg)
913                                 pipeW.CloseWithError(err)
914                                 <-errs
915                                 return err
916                         }
917                         pipeW.Close()
918                         if err = <-errs; err != nil {
919                                 return err
920                         }
921                 }
922
923                 if !opts.DryRun && jobPath != "" {
924                         if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil {
925                                 ctx.LogE("rx-area-mkdir", les, err, logMsg)
926                                 return err
927                         }
928                         if fd, err := os.Create(seenPath); err == nil {
929                                 fd.Close()
930                                 if err = DirSync(seenDir); err != nil {
931                                         ctx.LogE("rx-area-dirsync", les, err, logMsg)
932                                         return err
933                                 }
934                         }
935                         if err = os.Remove(jobPath); err != nil {
936                                 ctx.LogE("rx", les, err, func(les LEs) string {
937                                         return fmt.Sprintf(
938                                                 "Tossing area %s/%s (%s): %s: removing",
939                                                 sender.Name, pktName,
940                                                 humanize.IBytes(pktSize),
941                                                 msgHash,
942                                         )
943                                 })
944                                 return err
945                         } else if ctx.HdrUsage {
946                                 os.Remove(JobPath2Hdr(jobPath))
947                         }
948                 }
949
950         case PktTypeACK:
951                 if opts.NoACK {
952                         return nil
953                 }
954                 hsh := Base32Codec.EncodeToString(pkt.Path[:MTHSize])
955                 les := append(les, LE{"Type", "ack"}, LE{"Pkt", hsh})
956                 logMsg := func(les LEs) string {
957                         return fmt.Sprintf("Tossing ack %s/%s: %s", sender.Name, pktName, hsh)
958                 }
959                 ctx.LogD("rx-ack", les, logMsg)
960                 pktPath := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), hsh)
961                 if _, err := os.Stat(pktPath); err == nil {
962                         if !opts.DryRun {
963                                 if err = os.Remove(pktPath); err != nil {
964                                         ctx.LogE("rx-ack", les, err, func(les LEs) string {
965                                                 return logMsg(les) + ": removing packet"
966                                         })
967                                         return err
968                                 } else if ctx.HdrUsage {
969                                         os.Remove(JobPath2Hdr(pktPath))
970                                 }
971                         }
972                 } else {
973                         ctx.LogD("rx-ack", les, func(les LEs) string {
974                                 return logMsg(les) + ": already disappeared"
975                         })
976                 }
977                 if !opts.DryRun && opts.DoSeen {
978                         if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil {
979                                 return err
980                         }
981                         if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil {
982                                 fd.Close()
983                                 if err = DirSync(filepath.Dir(jobPath)); err != nil {
984                                         ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
985                                                 return fmt.Sprintf(
986                                                         "Tossing file %s/%s (%s): %s: dirsyncing",
987                                                         sender.Name, pktName,
988                                                         humanize.IBytes(pktSize),
989                                                         filepath.Base(jobPath),
990                                                 )
991                                         })
992                                         return err
993                                 }
994                         }
995                 }
996                 if !opts.DryRun {
997                         if err = os.Remove(jobPath); err != nil {
998                                 ctx.LogE("rx", les, err, func(les LEs) string {
999                                         return logMsg(les) + ": removing job"
1000                                 })
1001                                 return err
1002                         } else if ctx.HdrUsage {
1003                                 os.Remove(JobPath2Hdr(jobPath))
1004                         }
1005                 }
1006                 ctx.LogI("rx", les, func(les LEs) string {
1007                         return fmt.Sprintf("Got ACK packet from %s of %s", sender.Name, hsh)
1008                 })
1009
1010         default:
1011                 err = errors.New("unknown type")
1012                 ctx.LogE(
1013                         "rx-type-unknown", les, err,
1014                         func(les LEs) string {
1015                                 return fmt.Sprintf(
1016                                         "Tossing %s/%s (%s)",
1017                                         sender.Name, pktName, humanize.IBytes(pktSize),
1018                                 )
1019                         },
1020                 )
1021                 return err
1022         }
1023         return nil
1024 }
1025
1026 func (ctx *Ctx) Toss(nodeId *NodeId, xx TRxTx, opts *TossOpts) bool {
1027         dirLock, err := ctx.LockDir(nodeId, "toss")
1028         if err != nil {
1029                 return false
1030         }
1031         defer ctx.UnlockDir(dirLock)
1032         isBad := false
1033         decompressor, err := zstd.NewReader(nil)
1034         if err != nil {
1035                 panic(err)
1036         }
1037         defer decompressor.Close()
1038         for job := range ctx.Jobs(nodeId, xx) {
1039                 pktName := filepath.Base(job.Path)
1040                 les := LEs{
1041                         {"Node", job.PktEnc.Sender},
1042                         {"Pkt", pktName},
1043                         {"Nice", int(job.PktEnc.Nice)},
1044                 }
1045                 if job.PktEnc.Nice > opts.Nice {
1046                         ctx.LogD("rx-too-nice", les, func(les LEs) string {
1047                                 return fmt.Sprintf(
1048                                         "Tossing %s/%s: too nice: %s",
1049                                         ctx.NodeName(job.PktEnc.Sender), pktName,
1050                                         NicenessFmt(job.PktEnc.Nice),
1051                                 )
1052                         })
1053                         continue
1054                 }
1055                 fd, err := os.Open(job.Path)
1056                 if err != nil {
1057                         ctx.LogE("rx-open", les, err, func(les LEs) string {
1058                                 return fmt.Sprintf(
1059                                         "Tossing %s/%s: opening %s",
1060                                         ctx.NodeName(job.PktEnc.Sender), pktName, job.Path,
1061                                 )
1062                         })
1063                         isBad = true
1064                         continue
1065                 }
1066                 sender := ctx.Neigh[*job.PktEnc.Sender]
1067                 if sender == nil {
1068                         err := errors.New("unknown node")
1069                         ctx.LogE("rx-open", les, err, func(les LEs) string {
1070                                 return fmt.Sprintf(
1071                                         "Tossing %s/%s",
1072                                         ctx.NodeName(job.PktEnc.Sender), pktName,
1073                                 )
1074                         })
1075                         isBad = true
1076                         continue
1077                 }
1078                 errs := make(chan error, 1)
1079                 var sharedKey []byte
1080         Retry:
1081                 pipeR, pipeW := io.Pipe()
1082                 go func() {
1083                         errs <- jobProcess(
1084                                 ctx,
1085                                 pipeR,
1086                                 pktName,
1087                                 les,
1088                                 sender,
1089                                 job.PktEnc.Nice,
1090                                 uint64(pktSizeWithoutEnc(job.Size)),
1091                                 job.Path,
1092                                 decompressor,
1093                                 opts,
1094                         )
1095                 }()
1096                 pipeWB := bufio.NewWriter(pipeW)
1097                 sharedKey, _, _, err = PktEncRead(
1098                         ctx.Self,
1099                         ctx.Neigh,
1100                         bufio.NewReaderSize(fd, MTHBlockSize),
1101                         pipeWB,
1102                         sharedKey == nil,
1103                         sharedKey,
1104                 )
1105                 if err != nil {
1106                         pipeW.CloseWithError(err)
1107                 }
1108                 if err := pipeWB.Flush(); err != nil {
1109                         pipeW.CloseWithError(err)
1110                 }
1111                 pipeW.Close()
1112
1113                 if err != nil {
1114                         isBad = true
1115                         fd.Close()
1116                         <-errs
1117                         continue
1118                 }
1119                 if err = <-errs; err == JobRepeatProcess {
1120                         if _, err = fd.Seek(0, io.SeekStart); err != nil {
1121                                 ctx.LogE("rx-seek", les, err, func(les LEs) string {
1122                                         return fmt.Sprintf(
1123                                                 "Tossing %s/%s: can not seek",
1124                                                 ctx.NodeName(job.PktEnc.Sender),
1125                                                 pktName,
1126                                         )
1127                                 })
1128                                 isBad = true
1129                                 break
1130                         }
1131                         goto Retry
1132                 } else if err != nil {
1133                         isBad = true
1134                 }
1135                 fd.Close()
1136         }
1137         return isBad
1138 }
1139
1140 func (ctx *Ctx) AutoToss(nodeId *NodeId, opts *TossOpts) (chan struct{}, chan bool) {
1141         dw, err := ctx.NewDirWatcher(
1142                 filepath.Join(ctx.Spool, nodeId.String(), string(TRx)),
1143                 time.Second,
1144         )
1145         if err != nil {
1146                 log.Fatalln(err)
1147         }
1148         finish := make(chan struct{})
1149         badCode := make(chan bool)
1150         go func() {
1151                 bad := false
1152                 for {
1153                         select {
1154                         case <-finish:
1155                                 dw.Close()
1156                                 badCode <- bad
1157                                 return
1158                         case <-dw.C:
1159                                 bad = !ctx.Toss(nodeId, TRx, opts) || bad
1160                         }
1161                 }
1162         }()
1163         return finish, badCode
1164 }