From 726c119e6b2340994ada9fbd0e252acd31fb78b5 Mon Sep 17 00:00:00 2001 From: Sergey Matveev Date: Sun, 29 Aug 2021 20:52:35 +0300 Subject: [PATCH] fsnotify usage --- doc/building.texi | 6 +++ doc/download.texi | 1 + doc/news.ru.texi | 11 +++++ doc/news.texi | 11 +++++ src/cmd/nncp-toss/main.go | 73 +++++++++++++++++++++-------- src/dirwatch.go | 99 +++++++++++++++++++++++++++++++++++++++ src/dirwatch_dummy.go | 45 ++++++++++++++++++ src/go.mod | 3 +- src/go.sum | 5 +- src/nncp.go | 2 +- src/sp.go | 14 ++++-- src/toss.go | 19 +++++--- 12 files changed, 258 insertions(+), 31 deletions(-) create mode 100644 src/dirwatch.go create mode 100644 src/dirwatch_dummy.go diff --git a/doc/building.texi b/doc/building.texi index e669923..13c40a5 100644 --- a/doc/building.texi +++ b/doc/building.texi @@ -40,3 +40,9 @@ install binaries and info-documentation: @example # PREFIX=/usr/local redo install @end example + +NNCP depends on @code{github.com/fsnotify/fsnotify} library, that is +solely relies on OS-specific mechanisms. There is possibility that you +have either broken or unsupported ones. You can still build NNCP with +@code{-tags nofsnotify} build option, to skip @code{fsnotify} library +usage at all. diff --git a/doc/download.texi b/doc/download.texi index ad239cf..802a634 100644 --- a/doc/download.texi +++ b/doc/download.texi @@ -12,6 +12,7 @@ Tarballs include all necessary required libraries: @item @code{github.com/davecgh/go-xdr} @tab ISC @item @code{github.com/dustin/go-humanize} @tab MIT @item @code{github.com/flynn/noise} @tab BSD 3-Clause +@item @code{github.com/fsnotify/fsnotify} @tab BSD 3-Clause @item @code{github.com/gorhill/cronexpr} @tab GNU GPLv3 @item @code{github.com/gosuri/uilive} @tab MIT @item @code{github.com/hjson/hjson-go} @tab MIT diff --git a/doc/news.ru.texi b/doc/news.ru.texi index 5de3cba..24933c6 100644 --- a/doc/news.ru.texi +++ b/doc/news.ru.texi @@ -1,6 +1,17 @@ @node Новости @section Новости +@node Релиз 7.7.0 +@subsection Релиз 7.7.0 +@itemize + +@item +Экспериментальная поддержка @code{kqueue} и @code{inotify} оповещений об +изменениях в spool директориях, для сокращения накладных расходов на их +частое чтение. + +@end itemize + @node Релиз 7.6.0 @subsection Релиз 7.6.0 @itemize diff --git a/doc/news.texi b/doc/news.texi index e737a52..c5a7124 100644 --- a/doc/news.texi +++ b/doc/news.texi @@ -3,6 +3,17 @@ See also this page @ref{Новости, on russian}. +@node Release 7_7_0 +@section Release 7.7.0 +@itemize + +@item +Experimental @code{kqueue} and @code{inotify} based notifications +support about spool directory changes, for reducing their often reading +overhead. + +@end itemize + @node Release 7_6_0 @section Release 7.6.0 @itemize diff --git a/src/cmd/nncp-toss/main.go b/src/cmd/nncp-toss/main.go index af3a59d..8497486 100644 --- a/src/cmd/nncp-toss/main.go +++ b/src/cmd/nncp-toss/main.go @@ -23,6 +23,7 @@ import ( "fmt" "log" "os" + "path/filepath" "time" "go.cypherpunks.ru/nncp/v7" @@ -99,32 +100,66 @@ func main() { ctx.Umask() -Cycle: - isBad := false + if *cycle == 0 { + isBad := false + for nodeId, node := range ctx.Neigh { + if nodeOnly != nil && nodeId != *nodeOnly.Id { + continue + } + isBad = ctx.Toss( + node.Id, + nncp.TRx, + nice, + *dryRun, *doSeen, *noFile, *noFreq, *noExec, *noTrns, *noArea, + ) || isBad + if nodeId == *ctx.SelfId { + isBad = ctx.Toss( + node.Id, + nncp.TTx, + nice, + *dryRun, false, true, true, true, true, *noArea, + ) || isBad + } + } + if isBad { + os.Exit(1) + } + return + } + + nodeIds := make(chan *nncp.NodeId) for nodeId, node := range ctx.Neigh { if nodeOnly != nil && nodeId != *nodeOnly.Id { continue } - isBad = ctx.Toss( - node.Id, - nncp.TRx, - nice, - *dryRun, *doSeen, *noFile, *noFreq, *noExec, *noTrns, *noArea, - ) || isBad - if nodeId == *ctx.SelfId { - isBad = ctx.Toss( - node.Id, + dw, err := ctx.NewDirWatcher( + filepath.Join(ctx.Spool, node.Id.String(), string(nncp.TRx)), + time.Second*time.Duration(*cycle), + ) + if err != nil { + log.Fatalln(err) + } + go func(nodeId *nncp.NodeId) { + for range dw.C { + nodeIds <- nodeId + } + }(node.Id) + } + for nodeId := range nodeIds { + if *nodeId == *ctx.SelfId { + ctx.Toss( + nodeId, nncp.TTx, nice, *dryRun, false, true, true, true, true, *noArea, - ) || isBad + ) + } else { + ctx.Toss( + nodeId, + nncp.TRx, + nice, + *dryRun, *doSeen, *noFile, *noFreq, *noExec, *noTrns, *noArea, + ) } } - if *cycle > 0 { - time.Sleep(time.Duration(*cycle) * time.Second) - goto Cycle - } - if isBad { - os.Exit(1) - } } diff --git a/src/dirwatch.go b/src/dirwatch.go new file mode 100644 index 0000000..6637535 --- /dev/null +++ b/src/dirwatch.go @@ -0,0 +1,99 @@ +// +build !nofsnotify + +/* +NNCP -- Node to Node copy, utilities for store-and-forward data exchange +Copyright (C) 2016-2021 Sergey Matveev + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, version 3 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +*/ + +package nncp + +import ( + "fmt" + "os" + "time" + + "github.com/fsnotify/fsnotify" +) + +type DirWatcher struct { + w *fsnotify.Watcher + C chan struct{} + isDead chan struct{} +} + +func (ctx *Ctx) NewDirWatcher(dir string, d time.Duration) (*DirWatcher, error) { + w, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + err = w.Add(dir) + if err != nil { + if !os.IsNotExist(err) { + w.Close() + return nil, err + } + if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil { + w.Close() + return nil, err + } + err = w.Add(dir) + if err != nil { + w.Close() + return nil, err + } + } + dw := DirWatcher{ + w: w, + C: make(chan struct{}), + isDead: make(chan struct{}), + } + go func() { + ticker := time.NewTicker(d) + dw.C <- struct{}{} + hasEvents := false + for { + select { + case err := <-w.Errors: + ctx.LogE("dir-watch", LEs{{"Dir", dir}}, err, func(les LEs) string { + return "fsnotify error: " + err.Error() + }) + case e := <-w.Events: + ctx.LogD("dir-watch-event", LEs{{"Dir", dir}}, func(les LEs) string { + return fmt.Sprintf("fsnotify event: %v", e) + }) + if e.Op&(fsnotify.Create|fsnotify.Rename) > 0 { + hasEvents = true + } + case <-ticker.C: + if hasEvents { + dw.C <- struct{}{} + hasEvents = false + } + case <-dw.isDead: + w.Close() + ticker.Stop() + close(dw.C) + return + } + } + }() + return &dw, err +} + +func (dw *DirWatcher) Close() { + close(dw.isDead) + for range dw.C { + } +} diff --git a/src/dirwatch_dummy.go b/src/dirwatch_dummy.go new file mode 100644 index 0000000..147eac0 --- /dev/null +++ b/src/dirwatch_dummy.go @@ -0,0 +1,45 @@ +// +build nofsnotify + +/* +NNCP -- Node to Node copy, utilities for store-and-forward data exchange +Copyright (C) 2016-2021 Sergey Matveev + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, version 3 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +*/ + +package nncp + +import ( + "time" +) + +type DirWatcher struct { + C chan struct{} + ticker *time.Ticker +} + +func (ctx *Ctx) NewDirWatcher(dir string, d time.Duration) (*DirWatcher, error) { + dw := DirWatcher{C: make(chan struct{}), ticker: time.NewTicker(d)} + go func() { + for range dw.ticker.C { + dw.C <- struct{}{} + } + }() + return &dw, nil +} + +func (dw *DirWatcher) Close() { + dw.ticker.Stop() + for range dw.C { + } +} diff --git a/src/go.mod b/src/go.mod index 5267dd2..bce0675 100644 --- a/src/go.mod +++ b/src/go.mod @@ -4,6 +4,7 @@ require ( github.com/davecgh/go-xdr v0.0.0-20161123171359-e6a2ba005892 github.com/dustin/go-humanize v1.0.0 github.com/flynn/noise v1.0.0 + github.com/fsnotify/fsnotify v1.5.1 github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 github.com/hjson/hjson-go v3.1.0+incompatible github.com/klauspost/compress v1.13.1 @@ -11,7 +12,7 @@ require ( go.cypherpunks.ru/recfile v0.4.3 golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 golang.org/x/net v0.0.0-20210614182718-04defd469f4e - golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c + golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b lukechampine.com/blake3 v1.1.5 ) diff --git a/src/go.sum b/src/go.sum index 4caa9b0..05a086e 100644 --- a/src/go.sum +++ b/src/go.sum @@ -4,6 +4,8 @@ github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4 github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ= github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag= +github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI= +github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU= github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 h1:f0n1xnMSmBLzVfsMMvriDyA75NB/oBgILX2GcHXIQzY= @@ -32,8 +34,9 @@ golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c h1:F1jZWGFhYfh0Ci55sIpILtKKK8p3i2/krTr0H1rg74I= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf h1:2ucpDCmfkl8Bd/FsLtiD653Wf96cW37s+iGx93zsu4k= +golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b h1:9zKuko04nR4gjZ4+DNjHqRlAJqbJETHwiNKDqTfOjfE= golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= diff --git a/src/nncp.go b/src/nncp.go index c641668..e6ba597 100644 --- a/src/nncp.go +++ b/src/nncp.go @@ -40,7 +40,7 @@ along with this program. If not, see .` const Base32Encoded32Len = 52 var ( - Version string = "7.6.0" + Version string = "7.7.0" Base32Codec *base32.Encoding = base32.StdEncoding.WithPadding(base32.NoPadding) ) diff --git a/src/sp.go b/src/sp.go index 3de629b..1f2e915 100644 --- a/src/sp.go +++ b/src/sp.go @@ -23,6 +23,7 @@ import ( "errors" "fmt" "io" + "log" "os" "path/filepath" "sort" @@ -772,14 +773,21 @@ func (state *SPState) StartWorkers( if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) { state.wg.Add(1) go func() { - ticker := time.NewTicker(time.Second) + dw, err := state.Ctx.NewDirWatcher( + filepath.Join(state.Ctx.Spool, state.Node.Id.String(), string(TTx)), + time.Second, + ) + if err != nil { + state.Ctx.LogE("sp-queue-dir-watch", les, err, logMsg) + log.Fatalln(err) + } for { select { case <-state.isDead: + dw.Close() state.wg.Done() - ticker.Stop() return - case <-ticker.C: + case <-dw.C: for _, payload := range state.Ctx.infosOur( state.Node.Id, state.Nice, diff --git a/src/toss.go b/src/toss.go index a398649..9b8a6d1 100644 --- a/src/toss.go +++ b/src/toss.go @@ -1020,6 +1020,13 @@ func (ctx *Ctx) AutoToss( nice uint8, doSeen, noFile, noFreq, noExec, noTrns, noArea bool, ) (chan struct{}, chan bool) { + dw, err := ctx.NewDirWatcher( + filepath.Join(ctx.Spool, nodeId.String(), string(TRx)), + time.Second, + ) + if err != nil { + log.Fatalln(err) + } finish := make(chan struct{}) badCode := make(chan bool) go func() { @@ -1027,14 +1034,14 @@ func (ctx *Ctx) AutoToss( for { select { case <-finish: + dw.Close() badCode <- bad - break - default: + return + case <-dw.C: + bad = !ctx.Toss( + nodeId, TRx, nice, false, + doSeen, noFile, noFreq, noExec, noTrns, noArea) || bad } - time.Sleep(time.Second) - bad = !ctx.Toss( - nodeId, TRx, nice, false, - doSeen, noFile, noFreq, noExec, noTrns, noArea) || bad } }() return finish, badCode -- 2.44.0