X-Git-Url: http://www.git.cypherpunks.ru/?p=nncp.git;a=blobdiff_plain;f=src%2Ftoss.go;fp=src%2Ftoss.go;h=c4710494bc1a56d8ba5d785f89e0ce44ba23eb29;hp=b4363f713ea7856a16df74e26ff5a10ae0413ae6;hb=a13dfe188901835b627b02e1fc25638f5c9f68d5;hpb=9edd0bca196b4a7722ee64c27a8a2864f0677ad8 diff --git a/src/toss.go b/src/toss.go index b4363f7..c471049 100644 --- a/src/toss.go +++ b/src/toss.go @@ -44,8 +44,22 @@ import ( const ( SeenDir = "seen" + ACKDir = "ack" ) +type TossOpts struct { + Nice uint8 + DryRun bool + DoSeen bool + NoFile bool + NoFreq bool + NoExec bool + NoTrns bool + NoArea bool + NoACK bool + GenACK bool +} + func jobPath2Seen(jobPath string) string { return filepath.Join(filepath.Dir(jobPath), SeenDir, filepath.Base(jobPath)) } @@ -91,7 +105,7 @@ func jobProcess( pktSize uint64, jobPath string, decompressor *zstd.Decoder, - dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool, + opts *TossOpts, ) error { defer pipeR.Close() sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"] @@ -111,9 +125,36 @@ func jobProcess( humanize.IBytes(pktSize), ) }) + if opts.GenACK && pkt.Type != PktTypeACK { + newPktName, err := ctx.TxACK( + sender, sender.ACKNice, pktName, sender.ACKMinSize, + ) + if err != nil { + ctx.LogE("rx-unmarshal", les, err, func(les LEs) string { + return fmt.Sprintf("Tossing %s/%s: generating ACK", sender.Name, pktName) + }) + return err + } + ackDir := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), ACKDir) + os.MkdirAll(ackDir, os.FileMode(0777)) + if fd, err := os.Create(filepath.Join(ackDir, newPktName)); err == nil { + fd.Close() + if err = DirSync(ackDir); err != nil { + ctx.LogE("rx-genack", les, err, func(les LEs) string { + return fmt.Sprintf("Tossing %s/%s: genACK", sender.Name, pktName) + }) + return err + } + } else { + ctx.LogE("rx-genack", les, err, func(les LEs) string { + return fmt.Sprintf("Tossing %s/%s: genACK", sender.Name, pktName) + }) + return err + } + } switch pkt.Type { case PktTypeExec, PktTypeExecFat: - if noExec { + if opts.NoExec { return nil } path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0}) @@ -144,7 +185,7 @@ func jobProcess( log.Fatalln(err) } } - if !dryRun { + if !opts.DryRun { cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...) cmd.Env = append( cmd.Env, @@ -203,8 +244,8 @@ func jobProcess( humanize.IBytes(pktSize), ) }) - if !dryRun && jobPath != "" { - if doSeen { + if !opts.DryRun && jobPath != "" { + if opts.DoSeen { if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil { return err } @@ -238,7 +279,7 @@ func jobProcess( } case PktTypeFile: - if noFile { + if opts.NoFile { return nil } dst := string(pkt.Path[:int(pkt.PathLen)]) @@ -283,7 +324,7 @@ func jobProcess( }) return err } - if !dryRun { + if !opts.DryRun { tmp, err := TempFile(dir, "file") if err != nil { ctx.LogE("rx-mktemp", les, err, func(les LEs) string { @@ -400,9 +441,9 @@ func jobProcess( dst, humanize.IBytes(pktSize), sender.Name, ) }) - if !dryRun { + if !opts.DryRun { if jobPath != "" { - if doSeen { + if opts.DoSeen { if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil { return err } @@ -456,7 +497,7 @@ func jobProcess( } case PktTypeFreq: - if noFreq { + if opts.NoFreq { return nil } src := string(pkt.Path[:int(pkt.PathLen)]) @@ -503,7 +544,7 @@ func jobProcess( ) return err } - if !dryRun { + if !opts.DryRun { err = ctx.TxFile( sender, pkt.Nice, @@ -528,9 +569,9 @@ func jobProcess( ctx.LogI("rx", les, func(les LEs) string { return fmt.Sprintf("Got file request %s to %s", src, sender.Name) }) - if !dryRun { + if !opts.DryRun { if jobPath != "" { - if doSeen { + if opts.DoSeen { if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil { return err } @@ -583,7 +624,7 @@ func jobProcess( } case PktTypeTrns: - if noTrns { + if opts.NoTrns { return nil } dst := new([MTHSize]byte) @@ -605,7 +646,7 @@ func jobProcess( return err } ctx.LogD("rx-tx", les, logMsg) - if !dryRun { + if !opts.DryRun { if len(node.Via) == 0 { if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil { ctx.LogE("rx", les, err, func(les LEs) string { @@ -645,8 +686,8 @@ func jobProcess( humanize.IBytes(pktSize), ) }) - if !dryRun && jobPath != "" { - if doSeen { + if !opts.DryRun && jobPath != "" { + if opts.DoSeen { if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil { return err } @@ -681,7 +722,7 @@ func jobProcess( } case PktTypeArea: - if noArea { + if opts.NoArea { return nil } areaId := new(AreaId) @@ -712,7 +753,7 @@ func jobProcess( les = append(les, LE{"AreaMsg", msgHash}) ctx.LogD("rx-area", les, logMsg) - if dryRun { + if opts.DryRun { for _, nodeId := range area.Subs { node := ctx.Neigh[*nodeId] lesEcho := append(les, LE{"Echo", nodeId}) @@ -791,7 +832,7 @@ func jobProcess( ctx.LogD("rx-area-seen", les, func(les LEs) string { return logMsg(les) + ": already seen" }) - if !dryRun && jobPath != "" { + if !opts.DryRun && jobPath != "" { if err = os.Remove(jobPath); err != nil { ctx.LogE("rx-area-remove", les, err, func(les LEs) string { return fmt.Sprintf( @@ -858,7 +899,7 @@ func jobProcess( uint64(pktSizeWithoutEnc(int64(pktSize))), "", decompressor, - dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK, + opts, ) }() _, _, _, err = PktEncRead( @@ -881,7 +922,7 @@ func jobProcess( } } - if !dryRun && jobPath != "" { + if !opts.DryRun && jobPath != "" { if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil { ctx.LogE("rx-area-mkdir", les, err, logMsg) return err @@ -909,7 +950,7 @@ func jobProcess( } case PktTypeACK: - if noACK { + if opts.NoACK { return nil } hsh := Base32Codec.EncodeToString(pkt.Path[:MTHSize]) @@ -920,7 +961,7 @@ func jobProcess( ctx.LogD("rx-ack", les, logMsg) pktPath := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), hsh) if _, err := os.Stat(pktPath); err == nil { - if !dryRun { + if !opts.DryRun { if err = os.Remove(pktPath); err != nil { ctx.LogE("rx-ack", les, err, func(les LEs) string { return logMsg(les) + ": removing packet" @@ -935,7 +976,7 @@ func jobProcess( return logMsg(les) + ": already disappeared" }) } - if !dryRun && doSeen { + if !opts.DryRun && opts.DoSeen { if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil { return err } @@ -954,7 +995,7 @@ func jobProcess( } } } - if !dryRun { + if !opts.DryRun { if err = os.Remove(jobPath); err != nil { ctx.LogE("rx", les, err, func(les LEs) string { return logMsg(les) + ": removing job" @@ -984,12 +1025,7 @@ func jobProcess( return nil } -func (ctx *Ctx) Toss( - nodeId *NodeId, - xx TRxTx, - nice uint8, - dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool, -) bool { +func (ctx *Ctx) Toss(nodeId *NodeId, xx TRxTx, opts *TossOpts) bool { dirLock, err := ctx.LockDir(nodeId, "toss") if err != nil { return false @@ -1008,7 +1044,7 @@ func (ctx *Ctx) Toss( {"Pkt", pktName}, {"Nice", int(job.PktEnc.Nice)}, } - if job.PktEnc.Nice > nice { + if job.PktEnc.Nice > opts.Nice { ctx.LogD("rx-too-nice", les, func(les LEs) string { return fmt.Sprintf( "Tossing %s/%s: too nice: %s", @@ -1056,7 +1092,7 @@ func (ctx *Ctx) Toss( uint64(pktSizeWithoutEnc(job.Size)), job.Path, decompressor, - dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK, + opts, ) }() pipeWB := bufio.NewWriter(pipeW) @@ -1103,11 +1139,7 @@ func (ctx *Ctx) Toss( return isBad } -func (ctx *Ctx) AutoToss( - nodeId *NodeId, - nice uint8, - doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool, -) (chan struct{}, chan bool) { +func (ctx *Ctx) AutoToss(nodeId *NodeId, opts *TossOpts) (chan struct{}, chan bool) { dw, err := ctx.NewDirWatcher( filepath.Join(ctx.Spool, nodeId.String(), string(TRx)), time.Second, @@ -1126,9 +1158,7 @@ func (ctx *Ctx) AutoToss( badCode <- bad return case <-dw.C: - bad = !ctx.Toss( - nodeId, TRx, nice, false, - doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK) || bad + bad = !ctx.Toss(nodeId, TRx, opts) || bad } } }()