From 8e400e0530b58e1ac84d9a4db3175d396a39abf0 Mon Sep 17 00:00:00 2001 From: Sergey Matveev Date: Sat, 18 Nov 2017 23:27:19 +0300 Subject: [PATCH] Ability to create and deal with .seen files after tossing --- doc/cmds.texi | 7 ++++- doc/news.ru.texi | 3 +++ doc/news.texi | 3 +++ doc/spool.texi | 6 +++++ .../nncp/cmd/nncp-bundle/main.go | 4 +++ src/cypherpunks.ru/nncp/cmd/nncp-toss/main.go | 3 ++- src/cypherpunks.ru/nncp/cmd/nncp-xfer/main.go | 10 +++++++ src/cypherpunks.ru/nncp/toss.go | 26 ++++++++++++++++++- src/cypherpunks.ru/nncp/toss_test.go | 20 +++++++------- 9 files changed, 69 insertions(+), 13 deletions(-) diff --git a/doc/cmds.texi b/doc/cmds.texi index f1a029a..b5db1c0 100644 --- a/doc/cmds.texi +++ b/doc/cmds.texi @@ -418,7 +418,7 @@ queues. @section nncp-toss @verbatim -% nncp-toss [options] [-dryrun] [-cycle INT] +% nncp-toss [options] [-dryrun] [-cycle INT] [-seen] @end verbatim Perform "tossing" operation on all inbound packets. This is the tool @@ -433,6 +433,11 @@ tells what it will do. @option{INT} seconds in an infinite loop. That can be useful when running this command as a daemon. +@option{-seen} option creates empty @file{XXX.seen} file after +successful tossing of @file{XXX} packet. @ref{nncp-xfer} and +@ref{nncp-bundle} commands skip inbound packets that has been already +seen, processed and tossed. This is helpful to defeat duplicates. + @node nncp-xfer @section nncp-xfer diff --git a/doc/news.ru.texi b/doc/news.ru.texi index f0a2b6f..dd3e0a8 100644 --- a/doc/news.ru.texi +++ b/doc/news.ru.texi @@ -13,6 +13,9 @@ методах передачи (например запись на CD-ROM без создания промежуточного подготовленного ISO образа или работа с ленточными накопителями). @item +@command{nncp-toss} команда может создавать @file{.seen} файлы, +предотвращая приём дублированных пакетов. +@item В команде @command{nncp-call} разрешается иметь только одного обработчика контрольной суммы в фоне. Это полезно когда тысячи маленьких входящих пакетов могут создать много горутин. diff --git a/doc/news.texi b/doc/news.texi index 190d04a..aa89c7d 100644 --- a/doc/news.texi +++ b/doc/news.texi @@ -15,6 +15,9 @@ packets, or digest it. It is useful when dealing with stdin/stdout based transmission methods (like writing to CD-ROM without intermediate prepared ISO image and working with tape drives). @item +@command{nncp-toss} is able to create @file{.seen} files preventing +duplicate packets receiving. +@item Single background checksum verifier worker is allowed in @command{nncp-call}. This is helpful when thousands of small inbound packets could create many goroutines. diff --git a/doc/spool.texi b/doc/spool.texi index 53ea301..a5db2dc 100644 --- a/doc/spool.texi +++ b/doc/spool.texi @@ -13,7 +13,9 @@ spool/2WHB...OABQ/tx.lock spool/BYRR...CG6Q/rx.lock spool/BYRR...CG6Q/rx/ spool/BYRR...CG6Q/tx.lock +spool/BYRR...CG6Q/tx/AQUT...DGNT.seen spool/BYRR...CG6Q/tx/NSYY...ZUU6 +spool/BYRR...CG6Q/tx/VCSR...3VXX.seen spool/BYRR...CG6Q/tx/ZI5U...5RRQ @end verbatim @@ -30,5 +32,9 @@ partly received file from @file{2WHB...OABQ} node. @file{tx} directory can not contain partly written files -- they are moved atomically from @file{tmp}. +When @ref{nncp-toss} utility is called with @option{-seen} option, it +will create empty @file{XXX.seen} files, telling that some kind of +packet already was tossed sometime. + Only one process can work with @file{rx}/@file{tx} directories at once, so there are corresponding lock files. diff --git a/src/cypherpunks.ru/nncp/cmd/nncp-bundle/main.go b/src/cypherpunks.ru/nncp/cmd/nncp-bundle/main.go index 36c9cc9..210a140 100644 --- a/src/cypherpunks.ru/nncp/cmd/nncp-bundle/main.go +++ b/src/cypherpunks.ru/nncp/cmd/nncp-bundle/main.go @@ -264,6 +264,10 @@ func main() { ctx.LogD("nncp-bundle", sds, "Packet already exists") continue } + if _, err = os.Stat(dstPath + nncp.SeenPostfix); err == nil || !os.IsNotExist(err) { + ctx.LogD("nncp-bundle", sds, "Packet already exists") + continue + } if *doCheck { tmp, err := ctx.NewTmpFileWHash() if err != nil { diff --git a/src/cypherpunks.ru/nncp/cmd/nncp-toss/main.go b/src/cypherpunks.ru/nncp/cmd/nncp-toss/main.go index 71a15ee..12dc214 100644 --- a/src/cypherpunks.ru/nncp/cmd/nncp-toss/main.go +++ b/src/cypherpunks.ru/nncp/cmd/nncp-toss/main.go @@ -43,6 +43,7 @@ func main() { nodeRaw = flag.String("node", "", "Process only that node") niceRaw = flag.Int("nice", 255, "Minimal required niceness") dryRun = flag.Bool("dryrun", false, "Do not actually write any tossed data") + doSeen = flag.Bool("seen", false, "Create .seen files") cycle = flag.Uint("cycle", 0, "Repeat tossing after N seconds in infinite loop") quiet = flag.Bool("quiet", false, "Print only errors") debug = flag.Bool("debug", false, "Print debug messages") @@ -92,7 +93,7 @@ Cycle: if nodeOnly != nil && nodeId != *nodeOnly.Id { continue } - isBad = ctx.Toss(node.Id, nice, *dryRun) + isBad = ctx.Toss(node.Id, nice, *dryRun, *doSeen) } if *cycle > 0 { time.Sleep(time.Duration(*cycle) * time.Second) diff --git a/src/cypherpunks.ru/nncp/cmd/nncp-xfer/main.go b/src/cypherpunks.ru/nncp/cmd/nncp-xfer/main.go index 5abac1f..05a3748 100644 --- a/src/cypherpunks.ru/nncp/cmd/nncp-xfer/main.go +++ b/src/cypherpunks.ru/nncp/cmd/nncp-xfer/main.go @@ -285,6 +285,16 @@ Tx: job.Fd.Close() continue } + if _, err = os.Stat(filepath.Join(dstPath, pktName)); err == nil || !os.IsNotExist(err) { + ctx.LogD("nncp-xfer", sds, "already exists") + job.Fd.Close() + continue + } + if _, err = os.Stat(filepath.Join(dstPath, pktName+nncp.SeenPostfix)); err == nil || !os.IsNotExist(err) { + ctx.LogD("nncp-xfer", sds, "already exists") + job.Fd.Close() + continue + } tmp, err := ioutil.TempFile(dstPath, "nncp-xfer") if err != nil { ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "mktemp") diff --git a/src/cypherpunks.ru/nncp/toss.go b/src/cypherpunks.ru/nncp/toss.go index bdd7da4..a1fa8f3 100644 --- a/src/cypherpunks.ru/nncp/toss.go +++ b/src/cypherpunks.ru/nncp/toss.go @@ -38,6 +38,10 @@ import ( "golang.org/x/crypto/blake2b" ) +const ( + SeenPostfix = ".seen" +) + func newNotification(fromTo *FromToYAML, subject string) io.Reader { return strings.NewReader(fmt.Sprintf( "From: %s\nTo: %s\nSubject: %s\n", @@ -47,7 +51,7 @@ func newNotification(fromTo *FromToYAML, subject string) io.Reader { )) } -func (ctx *Ctx) Toss(nodeId *NodeId, nice uint8, dryRun bool) bool { +func (ctx *Ctx) Toss(nodeId *NodeId, nice uint8, dryRun, doSeen bool) bool { isBad := false for job := range ctx.Jobs(nodeId, TRx) { pktName := filepath.Base(job.Fd.Name()) @@ -123,6 +127,11 @@ func (ctx *Ctx) Toss(nodeId *NodeId, nice uint8, dryRun bool) bool { } ctx.LogI("rx", sds, "") if !dryRun { + if doSeen { + if fd, err := os.Create(job.Fd.Name() + SeenPostfix); err == nil { + fd.Close() + } + } if err = os.Remove(job.Fd.Name()); err != nil { ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove") isBad = true @@ -189,6 +198,11 @@ func (ctx *Ctx) Toss(nodeId *NodeId, nice uint8, dryRun bool) bool { } ctx.LogI("rx", sds, "") if !dryRun { + if doSeen { + if fd, err := os.Create(job.Fd.Name() + SeenPostfix); err == nil { + fd.Close() + } + } if err = os.Remove(job.Fd.Name()); err != nil { ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove") isBad = true @@ -258,6 +272,11 @@ func (ctx *Ctx) Toss(nodeId *NodeId, nice uint8, dryRun bool) bool { } ctx.LogI("rx", sds, "") if !dryRun { + if doSeen { + if fd, err := os.Create(job.Fd.Name() + SeenPostfix); err == nil { + fd.Close() + } + } if err = os.Remove(job.Fd.Name()); err != nil { ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove") isBad = true @@ -297,6 +316,11 @@ func (ctx *Ctx) Toss(nodeId *NodeId, nice uint8, dryRun bool) bool { } ctx.LogI("rx", sds, "") if !dryRun { + if doSeen { + if fd, err := os.Create(job.Fd.Name() + SeenPostfix); err == nil { + fd.Close() + } + } if err = os.Remove(job.Fd.Name()); err != nil { ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove") isBad = true diff --git a/src/cypherpunks.ru/nncp/toss_test.go b/src/cypherpunks.ru/nncp/toss_test.go index c40c9d8..47a4ba4 100644 --- a/src/cypherpunks.ru/nncp/toss_test.go +++ b/src/cypherpunks.ru/nncp/toss_test.go @@ -105,12 +105,12 @@ func TestTossEmail(t *testing.T) { if len(dirFiles(rxPath)) == 0 { continue } - ctx.Toss(ctx.Self.Id, DefaultNiceMail-1, false) + ctx.Toss(ctx.Self.Id, DefaultNiceMail-1, false, false) if len(dirFiles(rxPath)) == 0 { return false } ctx.Neigh[*nodeOur.Id].Sendmail = []string{"/bin/sh", "-c", "false"} - ctx.Toss(ctx.Self.Id, DefaultNiceMail, false) + ctx.Toss(ctx.Self.Id, DefaultNiceMail, false, false) if len(dirFiles(rxPath)) == 0 { return false } @@ -118,7 +118,7 @@ func TestTossEmail(t *testing.T) { "/bin/sh", "-c", fmt.Sprintf("cat >> %s", filepath.Join(spool, "mbox")), } - ctx.Toss(ctx.Self.Id, DefaultNiceMail, false) + ctx.Toss(ctx.Self.Id, DefaultNiceMail, false, false) if len(dirFiles(rxPath)) != 0 { return false } @@ -190,12 +190,12 @@ func TestTossFile(t *testing.T) { } rxPath := filepath.Join(spool, ctx.Self.Id.String(), string(TRx)) os.Rename(filepath.Join(spool, ctx.Self.Id.String(), string(TTx)), rxPath) - ctx.Toss(ctx.Self.Id, DefaultNiceFile, false) + ctx.Toss(ctx.Self.Id, DefaultNiceFile, false, false) if len(dirFiles(rxPath)) == 0 { return false } ctx.Neigh[*nodeOur.Id].Incoming = &incomingPath - ctx.Toss(ctx.Self.Id, DefaultNiceFile, false) + ctx.Toss(ctx.Self.Id, DefaultNiceFile, false, false) if len(dirFiles(rxPath)) != 0 { return false } @@ -262,7 +262,7 @@ func TestTossFileSameName(t *testing.T) { rxPath := filepath.Join(spool, ctx.Self.Id.String(), string(TRx)) os.Rename(filepath.Join(spool, ctx.Self.Id.String(), string(TTx)), rxPath) ctx.Neigh[*nodeOur.Id].Incoming = &incomingPath - ctx.Toss(ctx.Self.Id, DefaultNiceFile, false) + ctx.Toss(ctx.Self.Id, DefaultNiceFile, false, false) expected := make(map[string]struct{}) expected["samefile"] = struct{}{} for i := 0; i < files-1; i++ { @@ -330,12 +330,12 @@ func TestTossFreq(t *testing.T) { txPath := filepath.Join(spool, ctx.Self.Id.String(), string(TTx)) os.Rename(txPath, rxPath) os.MkdirAll(txPath, os.FileMode(0700)) - ctx.Toss(ctx.Self.Id, DefaultNiceFreq, false) + ctx.Toss(ctx.Self.Id, DefaultNiceFreq, false, false) if len(dirFiles(txPath)) != 0 || len(dirFiles(rxPath)) == 0 { return false } ctx.Neigh[*nodeOur.Id].Freq = &spool - ctx.Toss(ctx.Self.Id, DefaultNiceFreq, false) + ctx.Toss(ctx.Self.Id, DefaultNiceFreq, false, false) if len(dirFiles(txPath)) != 0 || len(dirFiles(rxPath)) == 0 { return false } @@ -348,7 +348,7 @@ func TestTossFreq(t *testing.T) { panic(err) } } - ctx.Toss(ctx.Self.Id, DefaultNiceFreq, false) + ctx.Toss(ctx.Self.Id, DefaultNiceFreq, false, false) if len(dirFiles(txPath)) == 0 || len(dirFiles(rxPath)) != 0 { return false } @@ -441,7 +441,7 @@ func TestTossTrns(t *testing.T) { panic(err) } } - ctx.Toss(ctx.Self.Id, 123, false) + ctx.Toss(ctx.Self.Id, 123, false, false) if len(dirFiles(rxPath)) != 0 { return false } -- 2.44.0