X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Ftoss.go;h=bdfcb31aad6d4ec44b0cc18dd17df32ce0ad4dde;hb=0367cce2741e1ce6a89a49fd5c4e9df6005c9744;hp=a398649535cfc49441cd0acea808e680f26af7d9;hpb=2758c63b64cf12e5f457daeb78e830959d26faae;p=nncp.git diff --git a/src/toss.go b/src/toss.go index a398649..bdfcb31 100644 --- a/src/toss.go +++ b/src/toss.go @@ -1,6 +1,6 @@ /* NNCP -- Node to Node copy, utilities for store-and-forward data exchange -Copyright (C) 2016-2021 Sergey Matveev +Copyright (C) 2016-2022 Sergey Matveev This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -43,9 +43,13 @@ import ( ) const ( - SeenSuffix = ".seen" + SeenDir = "seen" ) +func jobPath2Seen(jobPath string) string { + return filepath.Join(filepath.Dir(jobPath), SeenDir, filepath.Base(jobPath)) +} + func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader { lines := []string{ "From: " + fromTo.From, @@ -153,9 +157,12 @@ func jobProcess( } else { cmd.Stdin = pipeR } - output, err := cmd.Output() + output, err := cmd.CombinedOutput() if err != nil { - ctx.LogE("rx-hande", les, err, func(les LEs) string { + les = append(les, LE{"Output", strings.Split( + strings.Trim(string(output), "\n"), "\n"), + }) + ctx.LogE("rx-handle", les, err, func(les LEs) string { return fmt.Sprintf( "Tossing exec %s/%s (%s): %s: handling", sender.Name, pktName, @@ -198,7 +205,10 @@ func jobProcess( }) if !dryRun && jobPath != "" { if doSeen { - if fd, err := os.Create(jobPath + SeenSuffix); err == nil { + if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil { + return err + } + if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil { fd.Close() if err = DirSync(filepath.Dir(jobPath)); err != nil { ctx.LogE("rx-dirsync", les, err, func(les LEs) string { @@ -223,7 +233,7 @@ func jobProcess( }) return err } else if ctx.HdrUsage { - os.Remove(jobPath + HdrSuffix) + os.Remove(JobPath2Hdr(jobPath)) } } @@ -391,7 +401,10 @@ func jobProcess( if !dryRun { if jobPath != "" { if doSeen { - if fd, err := os.Create(jobPath + SeenSuffix); err == nil { + if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil { + return err + } + if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil { fd.Close() if err = DirSync(filepath.Dir(jobPath)); err != nil { ctx.LogE("rx-dirsync", les, err, func(les LEs) string { @@ -416,7 +429,7 @@ func jobProcess( }) return err } else if ctx.HdrUsage { - os.Remove(jobPath + HdrSuffix) + os.Remove(JobPath2Hdr(jobPath)) } } if len(sendmail) > 0 && ctx.NotifyFile != nil { @@ -516,7 +529,10 @@ func jobProcess( if !dryRun { if jobPath != "" { if doSeen { - if fd, err := os.Create(jobPath + SeenSuffix); err == nil { + if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil { + return err + } + if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil { fd.Close() if err = DirSync(filepath.Dir(jobPath)); err != nil { ctx.LogE("rx-dirsync", les, err, func(les LEs) string { @@ -541,7 +557,7 @@ func jobProcess( }) return err } else if ctx.HdrUsage { - os.Remove(jobPath + HdrSuffix) + os.Remove(JobPath2Hdr(jobPath)) } } if len(sendmail) > 0 && ctx.NotifyFreq != nil { @@ -603,11 +619,11 @@ func jobProcess( if err != nil { panic(err) } - if _, err = ctx.Tx( + if _, _, err = ctx.Tx( node, pktTrns, nice, - int64(pktSize), 0, + int64(pktSize), 0, MaxFileSize, pipeR, pktName, nil, @@ -629,7 +645,10 @@ func jobProcess( }) if !dryRun && jobPath != "" { if doSeen { - if fd, err := os.Create(jobPath + SeenSuffix); err == nil { + if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil { + return err + } + if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil { fd.Close() if err = DirSync(filepath.Dir(jobPath)); err != nil { ctx.LogE("rx-dirsync", les, err, func(les LEs) string { @@ -655,7 +674,7 @@ func jobProcess( }) return err } else if ctx.HdrUsage { - os.Remove(jobPath + HdrSuffix) + os.Remove(JobPath2Hdr(jobPath)) } } @@ -698,7 +717,7 @@ func jobProcess( seenDir := filepath.Join( ctx.Spool, nodeId.String(), AreaDir, area.Id.String(), ) - seenPath := filepath.Join(seenDir, msgHash+SeenSuffix) + seenPath := filepath.Join(seenDir, msgHash) logMsgNode := func(les LEs) string { return fmt.Sprintf( "%s: echoing to: %s", logMsg(les), node.Name, @@ -719,7 +738,7 @@ func jobProcess( seenDir := filepath.Join( ctx.Spool, nodeId.String(), AreaDir, area.Id.String(), ) - seenPath := filepath.Join(seenDir, msgHash+SeenSuffix) + seenPath := filepath.Join(seenDir, msgHash) logMsgNode := func(les LEs) string { return fmt.Sprintf("%s: echo to: %s", logMsg(les), node.Name) } @@ -731,8 +750,14 @@ func jobProcess( } if nodeId != sender.Id && nodeId != pktEnc.Sender { ctx.LogI("rx-area-echo", lesEcho, logMsgNode) - if _, err = ctx.Tx( - node, &pkt, nice, int64(pktSize), 0, fullPipeR, pktName, nil, + if _, _, err = ctx.Tx( + node, + &pkt, + nice, + int64(pktSize), 0, MaxFileSize, + fullPipeR, + pktName, + nil, ); err != nil { ctx.LogE("rx-area", lesEcho, err, logMsgNode) return err @@ -759,7 +784,7 @@ func jobProcess( seenDir := filepath.Join( ctx.Spool, ctx.SelfId.String(), AreaDir, area.Id.String(), ) - seenPath := filepath.Join(seenDir, msgHash+SeenSuffix) + seenPath := filepath.Join(seenDir, msgHash) if _, err := os.Stat(seenPath); err == nil { ctx.LogD("rx-area-seen", les, func(les LEs) string { return logMsg(les) + ": already seen" @@ -776,7 +801,7 @@ func jobProcess( }) return err } else if ctx.HdrUsage { - os.Remove(jobPath + HdrSuffix) + os.Remove(JobPath2Hdr(jobPath)) } } return nil @@ -843,6 +868,7 @@ func jobProcess( nil, ) if err != nil { + ctx.LogE("rx-area-pkt-enc-read2", les, err, logMsg) pipeW.CloseWithError(err) <-errs return err @@ -876,7 +902,7 @@ func jobProcess( }) return err } else if ctx.HdrUsage { - os.Remove(jobPath + HdrSuffix) + os.Remove(JobPath2Hdr(jobPath)) } } @@ -1020,6 +1046,13 @@ func (ctx *Ctx) AutoToss( nice uint8, doSeen, noFile, noFreq, noExec, noTrns, noArea bool, ) (chan struct{}, chan bool) { + dw, err := ctx.NewDirWatcher( + filepath.Join(ctx.Spool, nodeId.String(), string(TRx)), + time.Second, + ) + if err != nil { + log.Fatalln(err) + } finish := make(chan struct{}) badCode := make(chan bool) go func() { @@ -1027,14 +1060,14 @@ func (ctx *Ctx) AutoToss( for { select { case <-finish: + dw.Close() badCode <- bad - break - default: + return + case <-dw.C: + bad = !ctx.Toss( + nodeId, TRx, nice, false, + doSeen, noFile, noFreq, noExec, noTrns, noArea) || bad } - time.Sleep(time.Second) - bad = !ctx.Toss( - nodeId, TRx, nice, false, - doSeen, noFile, noFreq, noExec, noTrns, noArea) || bad } }() return finish, badCode