X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Ftoss.go;h=c4710494bc1a56d8ba5d785f89e0ce44ba23eb29;hb=2e22bda93fdf8f2f84e4d19b3f1d46318b497139;hp=0c27289e2cf55359ed8398f58d086c1b1eb97c88;hpb=a43910e6e9940fe088c38ec849bc983b54335d45;p=nncp.git diff --git a/src/toss.go b/src/toss.go index 0c27289..c471049 100644 --- a/src/toss.go +++ b/src/toss.go @@ -1,6 +1,6 @@ /* NNCP -- Node to Node copy, utilities for store-and-forward data exchange -Copyright (C) 2016-2021 Sergey Matveev +Copyright (C) 2016-2023 Sergey Matveev This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -24,7 +24,7 @@ import ( "errors" "fmt" "io" - "io/ioutil" + "io/fs" "log" "mime" "os" @@ -44,8 +44,22 @@ import ( const ( SeenDir = "seen" + ACKDir = "ack" ) +type TossOpts struct { + Nice uint8 + DryRun bool + DoSeen bool + NoFile bool + NoFreq bool + NoExec bool + NoTrns bool + NoArea bool + NoACK bool + GenACK bool +} + func jobPath2Seen(jobPath string) string { return filepath.Join(filepath.Dir(jobPath), SeenDir, filepath.Base(jobPath)) } @@ -91,7 +105,7 @@ func jobProcess( pktSize uint64, jobPath string, decompressor *zstd.Decoder, - dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool, + opts *TossOpts, ) error { defer pipeR.Close() sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"] @@ -111,9 +125,36 @@ func jobProcess( humanize.IBytes(pktSize), ) }) + if opts.GenACK && pkt.Type != PktTypeACK { + newPktName, err := ctx.TxACK( + sender, sender.ACKNice, pktName, sender.ACKMinSize, + ) + if err != nil { + ctx.LogE("rx-unmarshal", les, err, func(les LEs) string { + return fmt.Sprintf("Tossing %s/%s: generating ACK", sender.Name, pktName) + }) + return err + } + ackDir := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), ACKDir) + os.MkdirAll(ackDir, os.FileMode(0777)) + if fd, err := os.Create(filepath.Join(ackDir, newPktName)); err == nil { + fd.Close() + if err = DirSync(ackDir); err != nil { + ctx.LogE("rx-genack", les, err, func(les LEs) string { + return fmt.Sprintf("Tossing %s/%s: genACK", sender.Name, pktName) + }) + return err + } + } else { + ctx.LogE("rx-genack", les, err, func(les LEs) string { + return fmt.Sprintf("Tossing %s/%s: genACK", sender.Name, pktName) + }) + return err + } + } switch pkt.Type { case PktTypeExec, PktTypeExecFat: - if noExec { + if opts.NoExec { return nil } path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0}) @@ -144,7 +185,7 @@ func jobProcess( log.Fatalln(err) } } - if !dryRun { + if !opts.DryRun { cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...) cmd.Env = append( cmd.Env, @@ -203,8 +244,8 @@ func jobProcess( humanize.IBytes(pktSize), ) }) - if !dryRun && jobPath != "" { - if doSeen { + if !opts.DryRun && jobPath != "" { + if opts.DoSeen { if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil { return err } @@ -238,7 +279,7 @@ func jobProcess( } case PktTypeFile: - if noFile { + if opts.NoFile { return nil } dst := string(pkt.Path[:int(pkt.PathLen)]) @@ -283,7 +324,7 @@ func jobProcess( }) return err } - if !dryRun { + if !opts.DryRun { tmp, err := TempFile(dir, "file") if err != nil { ctx.LogE("rx-mktemp", les, err, func(les LEs) string { @@ -329,16 +370,18 @@ func jobProcess( }) return err } - if err = tmp.Sync(); err != nil { - tmp.Close() - ctx.LogE("rx-sync", les, err, func(les LEs) string { - return fmt.Sprintf( - "Tossing file %s/%s (%s): %s: syncing", - sender.Name, pktName, - humanize.IBytes(pktSize), dst, - ) - }) - return err + if !NoSync { + if err = tmp.Sync(); err != nil { + tmp.Close() + ctx.LogE("rx-sync", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: syncing", + sender.Name, pktName, + humanize.IBytes(pktSize), dst, + ) + }) + return err + } } if err = tmp.Close(); err != nil { ctx.LogE("rx-close", les, err, func(les LEs) string { @@ -355,7 +398,7 @@ func jobProcess( dstPathCtr := 0 for { if _, err = os.Stat(dstPath); err != nil { - if os.IsNotExist(err) { + if errors.Is(err, fs.ErrNotExist) { break } ctx.LogE("rx-stat", les, err, func(les LEs) string { @@ -398,9 +441,9 @@ func jobProcess( dst, humanize.IBytes(pktSize), sender.Name, ) }) - if !dryRun { + if !opts.DryRun { if jobPath != "" { - if doSeen { + if opts.DoSeen { if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil { return err } @@ -454,7 +497,7 @@ func jobProcess( } case PktTypeFreq: - if noFreq { + if opts.NoFreq { return nil } src := string(pkt.Path[:int(pkt.PathLen)]) @@ -473,7 +516,7 @@ func jobProcess( ) return err } - dstRaw, err := ioutil.ReadAll(pipeR) + dstRaw, err := io.ReadAll(pipeR) if err != nil { ctx.LogE("rx-read", les, err, func(les LEs) string { return fmt.Sprintf( @@ -501,7 +544,7 @@ func jobProcess( ) return err } - if !dryRun { + if !opts.DryRun { err = ctx.TxFile( sender, pkt.Nice, @@ -526,9 +569,9 @@ func jobProcess( ctx.LogI("rx", les, func(les LEs) string { return fmt.Sprintf("Got file request %s to %s", src, sender.Name) }) - if !dryRun { + if !opts.DryRun { if jobPath != "" { - if doSeen { + if opts.DoSeen { if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil { return err } @@ -581,7 +624,7 @@ func jobProcess( } case PktTypeTrns: - if noTrns { + if opts.NoTrns { return nil } dst := new([MTHSize]byte) @@ -603,7 +646,7 @@ func jobProcess( return err } ctx.LogD("rx-tx", les, logMsg) - if !dryRun { + if !opts.DryRun { if len(node.Via) == 0 { if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil { ctx.LogE("rx", les, err, func(les LEs) string { @@ -619,7 +662,7 @@ func jobProcess( if err != nil { panic(err) } - if _, _, err = ctx.Tx( + if _, _, _, err = ctx.Tx( node, pktTrns, nice, @@ -643,8 +686,8 @@ func jobProcess( humanize.IBytes(pktSize), ) }) - if !dryRun && jobPath != "" { - if doSeen { + if !opts.DryRun && jobPath != "" { + if opts.DoSeen { if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil { return err } @@ -679,7 +722,7 @@ func jobProcess( } case PktTypeArea: - if noArea { + if opts.NoArea { return nil } areaId := new(AreaId) @@ -710,7 +753,7 @@ func jobProcess( les = append(les, LE{"AreaMsg", msgHash}) ctx.LogD("rx-area", les, logMsg) - if dryRun { + if opts.DryRun { for _, nodeId := range area.Subs { node := ctx.Neigh[*nodeId] lesEcho := append(les, LE{"Echo", nodeId}) @@ -750,7 +793,7 @@ func jobProcess( } if nodeId != sender.Id && nodeId != pktEnc.Sender { ctx.LogI("rx-area-echo", lesEcho, logMsgNode) - if _, _, err = ctx.Tx( + if _, _, _, err = ctx.Tx( node, &pkt, nice, @@ -789,7 +832,7 @@ func jobProcess( ctx.LogD("rx-area-seen", les, func(les LEs) string { return logMsg(les) + ": already seen" }) - if !dryRun && jobPath != "" { + if !opts.DryRun && jobPath != "" { if err = os.Remove(jobPath); err != nil { ctx.LogE("rx-area-remove", les, err, func(les LEs) string { return fmt.Sprintf( @@ -856,7 +899,7 @@ func jobProcess( uint64(pktSizeWithoutEnc(int64(pktSize))), "", decompressor, - dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, + opts, ) }() _, _, _, err = PktEncRead( @@ -879,7 +922,7 @@ func jobProcess( } } - if !dryRun && jobPath != "" { + if !opts.DryRun && jobPath != "" { if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil { ctx.LogE("rx-area-mkdir", les, err, logMsg) return err @@ -906,6 +949,66 @@ func jobProcess( } } + case PktTypeACK: + if opts.NoACK { + return nil + } + hsh := Base32Codec.EncodeToString(pkt.Path[:MTHSize]) + les := append(les, LE{"Type", "ack"}, LE{"Pkt", hsh}) + logMsg := func(les LEs) string { + return fmt.Sprintf("Tossing ack %s/%s: %s", sender.Name, pktName, hsh) + } + ctx.LogD("rx-ack", les, logMsg) + pktPath := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), hsh) + if _, err := os.Stat(pktPath); err == nil { + if !opts.DryRun { + if err = os.Remove(pktPath); err != nil { + ctx.LogE("rx-ack", les, err, func(les LEs) string { + return logMsg(les) + ": removing packet" + }) + return err + } else if ctx.HdrUsage { + os.Remove(JobPath2Hdr(pktPath)) + } + } + } else { + ctx.LogD("rx-ack", les, func(les LEs) string { + return logMsg(les) + ": already disappeared" + }) + } + if !opts.DryRun && opts.DoSeen { + if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil { + return err + } + if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil { + fd.Close() + if err = DirSync(filepath.Dir(jobPath)); err != nil { + ctx.LogE("rx-dirsync", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: dirsyncing", + sender.Name, pktName, + humanize.IBytes(pktSize), + filepath.Base(jobPath), + ) + }) + return err + } + } + } + if !opts.DryRun { + if err = os.Remove(jobPath); err != nil { + ctx.LogE("rx", les, err, func(les LEs) string { + return logMsg(les) + ": removing job" + }) + return err + } else if ctx.HdrUsage { + os.Remove(JobPath2Hdr(jobPath)) + } + } + ctx.LogI("rx", les, func(les LEs) string { + return fmt.Sprintf("Got ACK packet from %s of %s", sender.Name, hsh) + }) + default: err = errors.New("unknown type") ctx.LogE( @@ -922,12 +1025,7 @@ func jobProcess( return nil } -func (ctx *Ctx) Toss( - nodeId *NodeId, - xx TRxTx, - nice uint8, - dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool, -) bool { +func (ctx *Ctx) Toss(nodeId *NodeId, xx TRxTx, opts *TossOpts) bool { dirLock, err := ctx.LockDir(nodeId, "toss") if err != nil { return false @@ -946,7 +1044,7 @@ func (ctx *Ctx) Toss( {"Pkt", pktName}, {"Nice", int(job.PktEnc.Nice)}, } - if job.PktEnc.Nice > nice { + if job.PktEnc.Nice > opts.Nice { ctx.LogD("rx-too-nice", les, func(les LEs) string { return fmt.Sprintf( "Tossing %s/%s: too nice: %s", @@ -994,14 +1092,14 @@ func (ctx *Ctx) Toss( uint64(pktSizeWithoutEnc(job.Size)), job.Path, decompressor, - dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, + opts, ) }() pipeWB := bufio.NewWriter(pipeW) sharedKey, _, _, err = PktEncRead( ctx.Self, ctx.Neigh, - bufio.NewReader(fd), + bufio.NewReaderSize(fd, MTHBlockSize), pipeWB, sharedKey == nil, sharedKey, @@ -1041,11 +1139,7 @@ func (ctx *Ctx) Toss( return isBad } -func (ctx *Ctx) AutoToss( - nodeId *NodeId, - nice uint8, - doSeen, noFile, noFreq, noExec, noTrns, noArea bool, -) (chan struct{}, chan bool) { +func (ctx *Ctx) AutoToss(nodeId *NodeId, opts *TossOpts) (chan struct{}, chan bool) { dw, err := ctx.NewDirWatcher( filepath.Join(ctx.Spool, nodeId.String(), string(TRx)), time.Second, @@ -1064,9 +1158,7 @@ func (ctx *Ctx) AutoToss( badCode <- bad return case <-dw.C: - bad = !ctx.Toss( - nodeId, TRx, nice, false, - doSeen, noFile, noFreq, noExec, noTrns, noArea) || bad + bad = !ctx.Toss(nodeId, TRx, opts) || bad } } }()