X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Ftoss.go;h=c4710494bc1a56d8ba5d785f89e0ce44ba23eb29;hb=2e22bda93fdf8f2f84e4d19b3f1d46318b497139;hp=21d3007e74ecd263c86030dbc1bdf1f9e06998a4;hpb=80a2c0146ad7b72c3b9ee8f59667e66ddb5e72df;p=nncp.git diff --git a/src/toss.go b/src/toss.go index 21d3007..c471049 100644 --- a/src/toss.go +++ b/src/toss.go @@ -1,6 +1,6 @@ /* NNCP -- Node to Node copy, utilities for store-and-forward data exchange -Copyright (C) 2016-2021 Sergey Matveev +Copyright (C) 2016-2023 Sergey Matveev This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -24,7 +24,7 @@ import ( "errors" "fmt" "io" - "io/ioutil" + "io/fs" "log" "mime" "os" @@ -43,9 +43,27 @@ import ( ) const ( - SeenSuffix = ".seen" + SeenDir = "seen" + ACKDir = "ack" ) +type TossOpts struct { + Nice uint8 + DryRun bool + DoSeen bool + NoFile bool + NoFreq bool + NoExec bool + NoTrns bool + NoArea bool + NoACK bool + GenACK bool +} + +func jobPath2Seen(jobPath string) string { + return filepath.Join(filepath.Dir(jobPath), SeenDir, filepath.Base(jobPath)) +} + func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader { lines := []string{ "From: " + fromTo.From, @@ -87,7 +105,7 @@ func jobProcess( pktSize uint64, jobPath string, decompressor *zstd.Decoder, - dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool, + opts *TossOpts, ) error { defer pipeR.Close() sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"] @@ -107,9 +125,36 @@ func jobProcess( humanize.IBytes(pktSize), ) }) + if opts.GenACK && pkt.Type != PktTypeACK { + newPktName, err := ctx.TxACK( + sender, sender.ACKNice, pktName, sender.ACKMinSize, + ) + if err != nil { + ctx.LogE("rx-unmarshal", les, err, func(les LEs) string { + return fmt.Sprintf("Tossing %s/%s: generating ACK", sender.Name, pktName) + }) + return err + } + ackDir := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), ACKDir) + os.MkdirAll(ackDir, os.FileMode(0777)) + if fd, err := os.Create(filepath.Join(ackDir, newPktName)); err == nil { + fd.Close() + if err = DirSync(ackDir); err != nil { + ctx.LogE("rx-genack", les, err, func(les LEs) string { + return fmt.Sprintf("Tossing %s/%s: genACK", sender.Name, pktName) + }) + return err + } + } else { + ctx.LogE("rx-genack", les, err, func(les LEs) string { + return fmt.Sprintf("Tossing %s/%s: genACK", sender.Name, pktName) + }) + return err + } + } switch pkt.Type { case PktTypeExec, PktTypeExecFat: - if noExec { + if opts.NoExec { return nil } path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0}) @@ -140,7 +185,7 @@ func jobProcess( log.Fatalln(err) } } - if !dryRun { + if !opts.DryRun { cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...) cmd.Env = append( cmd.Env, @@ -153,9 +198,12 @@ func jobProcess( } else { cmd.Stdin = pipeR } - output, err := cmd.Output() + output, err := cmd.CombinedOutput() if err != nil { - ctx.LogE("rx-hande", les, err, func(les LEs) string { + les = append(les, LE{"Output", strings.Split( + strings.Trim(string(output), "\n"), "\n"), + }) + ctx.LogE("rx-handle", les, err, func(les LEs) string { return fmt.Sprintf( "Tossing exec %s/%s (%s): %s: handling", sender.Name, pktName, @@ -196,9 +244,12 @@ func jobProcess( humanize.IBytes(pktSize), ) }) - if !dryRun && jobPath != "" { - if doSeen { - if fd, err := os.Create(jobPath + SeenSuffix); err == nil { + if !opts.DryRun && jobPath != "" { + if opts.DoSeen { + if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil { + return err + } + if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil { fd.Close() if err = DirSync(filepath.Dir(jobPath)); err != nil { ctx.LogE("rx-dirsync", les, err, func(les LEs) string { @@ -223,12 +274,12 @@ func jobProcess( }) return err } else if ctx.HdrUsage { - os.Remove(jobPath + HdrSuffix) + os.Remove(JobPath2Hdr(jobPath)) } } case PktTypeFile: - if noFile { + if opts.NoFile { return nil } dst := string(pkt.Path[:int(pkt.PathLen)]) @@ -273,7 +324,7 @@ func jobProcess( }) return err } - if !dryRun { + if !opts.DryRun { tmp, err := TempFile(dir, "file") if err != nil { ctx.LogE("rx-mktemp", les, err, func(les LEs) string { @@ -309,7 +360,7 @@ func jobProcess( return err } if err = bufW.Flush(); err != nil { - tmp.Close() // #nosec G104 + tmp.Close() ctx.LogE("rx-flush", les, err, func(les LEs) string { return fmt.Sprintf( "Tossing file %s/%s (%s): %s: flushing", @@ -319,16 +370,18 @@ func jobProcess( }) return err } - if err = tmp.Sync(); err != nil { - tmp.Close() // #nosec G104 - ctx.LogE("rx-sync", les, err, func(les LEs) string { - return fmt.Sprintf( - "Tossing file %s/%s (%s): %s: syncing", - sender.Name, pktName, - humanize.IBytes(pktSize), dst, - ) - }) - return err + if !NoSync { + if err = tmp.Sync(); err != nil { + tmp.Close() + ctx.LogE("rx-sync", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: syncing", + sender.Name, pktName, + humanize.IBytes(pktSize), dst, + ) + }) + return err + } } if err = tmp.Close(); err != nil { ctx.LogE("rx-close", les, err, func(les LEs) string { @@ -345,7 +398,7 @@ func jobProcess( dstPathCtr := 0 for { if _, err = os.Stat(dstPath); err != nil { - if os.IsNotExist(err) { + if errors.Is(err, fs.ErrNotExist) { break } ctx.LogE("rx-stat", les, err, func(les LEs) string { @@ -388,10 +441,13 @@ func jobProcess( dst, humanize.IBytes(pktSize), sender.Name, ) }) - if !dryRun { + if !opts.DryRun { if jobPath != "" { - if doSeen { - if fd, err := os.Create(jobPath + SeenSuffix); err == nil { + if opts.DoSeen { + if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil { + return err + } + if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil { fd.Close() if err = DirSync(filepath.Dir(jobPath)); err != nil { ctx.LogE("rx-dirsync", les, err, func(les LEs) string { @@ -416,7 +472,7 @@ func jobProcess( }) return err } else if ctx.HdrUsage { - os.Remove(jobPath + HdrSuffix) + os.Remove(JobPath2Hdr(jobPath)) } } if len(sendmail) > 0 && ctx.NotifyFile != nil { @@ -441,7 +497,7 @@ func jobProcess( } case PktTypeFreq: - if noFreq { + if opts.NoFreq { return nil } src := string(pkt.Path[:int(pkt.PathLen)]) @@ -460,7 +516,7 @@ func jobProcess( ) return err } - dstRaw, err := ioutil.ReadAll(pipeR) + dstRaw, err := io.ReadAll(pipeR) if err != nil { ctx.LogE("rx-read", les, err, func(les LEs) string { return fmt.Sprintf( @@ -488,7 +544,7 @@ func jobProcess( ) return err } - if !dryRun { + if !opts.DryRun { err = ctx.TxFile( sender, pkt.Nice, @@ -513,10 +569,13 @@ func jobProcess( ctx.LogI("rx", les, func(les LEs) string { return fmt.Sprintf("Got file request %s to %s", src, sender.Name) }) - if !dryRun { + if !opts.DryRun { if jobPath != "" { - if doSeen { - if fd, err := os.Create(jobPath + SeenSuffix); err == nil { + if opts.DoSeen { + if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil { + return err + } + if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil { fd.Close() if err = DirSync(filepath.Dir(jobPath)); err != nil { ctx.LogE("rx-dirsync", les, err, func(les LEs) string { @@ -541,7 +600,7 @@ func jobProcess( }) return err } else if ctx.HdrUsage { - os.Remove(jobPath + HdrSuffix) + os.Remove(JobPath2Hdr(jobPath)) } } if len(sendmail) > 0 && ctx.NotifyFreq != nil { @@ -565,7 +624,7 @@ func jobProcess( } case PktTypeTrns: - if noTrns { + if opts.NoTrns { return nil } dst := new([MTHSize]byte) @@ -587,7 +646,7 @@ func jobProcess( return err } ctx.LogD("rx-tx", les, logMsg) - if !dryRun { + if !opts.DryRun { if len(node.Via) == 0 { if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil { ctx.LogE("rx", les, err, func(les LEs) string { @@ -603,11 +662,11 @@ func jobProcess( if err != nil { panic(err) } - if _, err = ctx.Tx( + if _, _, _, err = ctx.Tx( node, pktTrns, nice, - int64(pktSize), 0, + int64(pktSize), 0, MaxFileSize, pipeR, pktName, nil, @@ -627,9 +686,12 @@ func jobProcess( humanize.IBytes(pktSize), ) }) - if !dryRun && jobPath != "" { - if doSeen { - if fd, err := os.Create(jobPath + SeenSuffix); err == nil { + if !opts.DryRun && jobPath != "" { + if opts.DoSeen { + if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil { + return err + } + if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil { fd.Close() if err = DirSync(filepath.Dir(jobPath)); err != nil { ctx.LogE("rx-dirsync", les, err, func(les LEs) string { @@ -655,12 +717,12 @@ func jobProcess( }) return err } else if ctx.HdrUsage { - os.Remove(jobPath + HdrSuffix) + os.Remove(JobPath2Hdr(jobPath)) } } case PktTypeArea: - if noArea { + if opts.NoArea { return nil } areaId := new(AreaId) @@ -691,14 +753,14 @@ func jobProcess( les = append(les, LE{"AreaMsg", msgHash}) ctx.LogD("rx-area", les, logMsg) - if dryRun { + if opts.DryRun { for _, nodeId := range area.Subs { node := ctx.Neigh[*nodeId] lesEcho := append(les, LE{"Echo", nodeId}) seenDir := filepath.Join( ctx.Spool, nodeId.String(), AreaDir, area.Id.String(), ) - seenPath := filepath.Join(seenDir, msgHash+SeenSuffix) + seenPath := filepath.Join(seenDir, msgHash) logMsgNode := func(les LEs) string { return fmt.Sprintf( "%s: echoing to: %s", logMsg(les), node.Name, @@ -719,7 +781,7 @@ func jobProcess( seenDir := filepath.Join( ctx.Spool, nodeId.String(), AreaDir, area.Id.String(), ) - seenPath := filepath.Join(seenDir, msgHash+SeenSuffix) + seenPath := filepath.Join(seenDir, msgHash) logMsgNode := func(les LEs) string { return fmt.Sprintf("%s: echo to: %s", logMsg(les), node.Name) } @@ -731,8 +793,14 @@ func jobProcess( } if nodeId != sender.Id && nodeId != pktEnc.Sender { ctx.LogI("rx-area-echo", lesEcho, logMsgNode) - if _, err = ctx.Tx( - node, &pkt, nice, int64(pktSize), 0, fullPipeR, pktName, nil, + if _, _, _, err = ctx.Tx( + node, + &pkt, + nice, + int64(pktSize), 0, MaxFileSize, + fullPipeR, + pktName, + nil, ); err != nil { ctx.LogE("rx-area", lesEcho, err, logMsgNode) return err @@ -759,12 +827,12 @@ func jobProcess( seenDir := filepath.Join( ctx.Spool, ctx.SelfId.String(), AreaDir, area.Id.String(), ) - seenPath := filepath.Join(seenDir, msgHash+SeenSuffix) + seenPath := filepath.Join(seenDir, msgHash) if _, err := os.Stat(seenPath); err == nil { ctx.LogD("rx-area-seen", les, func(les LEs) string { return logMsg(les) + ": already seen" }) - if !dryRun && jobPath != "" { + if !opts.DryRun && jobPath != "" { if err = os.Remove(jobPath); err != nil { ctx.LogE("rx-area-remove", les, err, func(les LEs) string { return fmt.Sprintf( @@ -776,7 +844,7 @@ func jobProcess( }) return err } else if ctx.HdrUsage { - os.Remove(jobPath + HdrSuffix) + os.Remove(JobPath2Hdr(jobPath)) } } return nil @@ -831,7 +899,7 @@ func jobProcess( uint64(pktSizeWithoutEnc(int64(pktSize))), "", decompressor, - dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, + opts, ) }() _, _, _, err = PktEncRead( @@ -843,6 +911,7 @@ func jobProcess( nil, ) if err != nil { + ctx.LogE("rx-area-pkt-enc-read2", les, err, logMsg) pipeW.CloseWithError(err) <-errs return err @@ -853,7 +922,7 @@ func jobProcess( } } - if !dryRun && jobPath != "" { + if !opts.DryRun && jobPath != "" { if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil { ctx.LogE("rx-area-mkdir", les, err, logMsg) return err @@ -876,9 +945,69 @@ func jobProcess( }) return err } else if ctx.HdrUsage { - os.Remove(jobPath + HdrSuffix) + os.Remove(JobPath2Hdr(jobPath)) + } + } + + case PktTypeACK: + if opts.NoACK { + return nil + } + hsh := Base32Codec.EncodeToString(pkt.Path[:MTHSize]) + les := append(les, LE{"Type", "ack"}, LE{"Pkt", hsh}) + logMsg := func(les LEs) string { + return fmt.Sprintf("Tossing ack %s/%s: %s", sender.Name, pktName, hsh) + } + ctx.LogD("rx-ack", les, logMsg) + pktPath := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), hsh) + if _, err := os.Stat(pktPath); err == nil { + if !opts.DryRun { + if err = os.Remove(pktPath); err != nil { + ctx.LogE("rx-ack", les, err, func(les LEs) string { + return logMsg(les) + ": removing packet" + }) + return err + } else if ctx.HdrUsage { + os.Remove(JobPath2Hdr(pktPath)) + } + } + } else { + ctx.LogD("rx-ack", les, func(les LEs) string { + return logMsg(les) + ": already disappeared" + }) + } + if !opts.DryRun && opts.DoSeen { + if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil { + return err + } + if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil { + fd.Close() + if err = DirSync(filepath.Dir(jobPath)); err != nil { + ctx.LogE("rx-dirsync", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: dirsyncing", + sender.Name, pktName, + humanize.IBytes(pktSize), + filepath.Base(jobPath), + ) + }) + return err + } + } + } + if !opts.DryRun { + if err = os.Remove(jobPath); err != nil { + ctx.LogE("rx", les, err, func(les LEs) string { + return logMsg(les) + ": removing job" + }) + return err + } else if ctx.HdrUsage { + os.Remove(JobPath2Hdr(jobPath)) } } + ctx.LogI("rx", les, func(les LEs) string { + return fmt.Sprintf("Got ACK packet from %s of %s", sender.Name, hsh) + }) default: err = errors.New("unknown type") @@ -896,12 +1025,7 @@ func jobProcess( return nil } -func (ctx *Ctx) Toss( - nodeId *NodeId, - xx TRxTx, - nice uint8, - dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool, -) bool { +func (ctx *Ctx) Toss(nodeId *NodeId, xx TRxTx, opts *TossOpts) bool { dirLock, err := ctx.LockDir(nodeId, "toss") if err != nil { return false @@ -920,7 +1044,7 @@ func (ctx *Ctx) Toss( {"Pkt", pktName}, {"Nice", int(job.PktEnc.Nice)}, } - if job.PktEnc.Nice > nice { + if job.PktEnc.Nice > opts.Nice { ctx.LogD("rx-too-nice", les, func(les LEs) string { return fmt.Sprintf( "Tossing %s/%s: too nice: %s", @@ -968,14 +1092,14 @@ func (ctx *Ctx) Toss( uint64(pktSizeWithoutEnc(job.Size)), job.Path, decompressor, - dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, + opts, ) }() pipeWB := bufio.NewWriter(pipeW) sharedKey, _, _, err = PktEncRead( ctx.Self, ctx.Neigh, - bufio.NewReader(fd), + bufio.NewReaderSize(fd, MTHBlockSize), pipeWB, sharedKey == nil, sharedKey, @@ -1015,11 +1139,14 @@ func (ctx *Ctx) Toss( return isBad } -func (ctx *Ctx) AutoToss( - nodeId *NodeId, - nice uint8, - doSeen, noFile, noFreq, noExec, noTrns, noArea bool, -) (chan struct{}, chan bool) { +func (ctx *Ctx) AutoToss(nodeId *NodeId, opts *TossOpts) (chan struct{}, chan bool) { + dw, err := ctx.NewDirWatcher( + filepath.Join(ctx.Spool, nodeId.String(), string(TRx)), + time.Second, + ) + if err != nil { + log.Fatalln(err) + } finish := make(chan struct{}) badCode := make(chan bool) go func() { @@ -1027,14 +1154,12 @@ func (ctx *Ctx) AutoToss( for { select { case <-finish: + dw.Close() badCode <- bad - break - default: + return + case <-dw.C: + bad = !ctx.Toss(nodeId, TRx, opts) || bad } - time.Sleep(time.Second) - bad = !ctx.Toss( - nodeId, TRx, nice, false, - doSeen, noFile, noFreq, noExec, noTrns, noArea) || bad } }() return finish, badCode