From: Sergey Matveev Date: Sun, 15 Jan 2017 12:10:49 +0000 (+0300) Subject: nncp-caller command X-Git-Tag: 0.2^2~18 X-Git-Url: http://www.git.cypherpunks.ru/?p=nncp.git;a=commitdiff_plain;h=919345b4dbaba2fc8f543efb8e0ec99330a775fc nncp-caller command --- diff --git a/common.mk b/common.mk index d3ed006..df20e2e 100644 --- a/common.mk +++ b/common.mk @@ -13,6 +13,7 @@ DOCDIR = $(DESTDIR)$(PREFIX)/share/doc/nncp ALL = \ nncp-mail \ nncp-call \ + nncp-caller \ nncp-check \ nncp-daemon \ nncp-file \ @@ -29,6 +30,9 @@ all: $(ALL) nncp-call: GOPATH=$(GOPATH) go build -ldflags "$(LDFLAGS)" cypherpunks.ru/nncp/cmd/nncp-call +nncp-caller: + GOPATH=$(GOPATH) go build -ldflags "$(LDFLAGS)" cypherpunks.ru/nncp/cmd/nncp-caller + nncp-check: GOPATH=$(GOPATH) go build -ldflags "$(LDFLAGS)" cypherpunks.ru/nncp/cmd/nncp-check diff --git a/doc/call.texi b/doc/call.texi new file mode 100644 index 0000000..a0fe033 --- /dev/null +++ b/doc/call.texi @@ -0,0 +1,173 @@ +@node Call +@unnumbered Call configuration + +Call is a rule when and how node can be called. + +Example list of call structures: + +@verbatim +calls: + - + cron: "*/1 * * * 0-4" + onlinedeadline: 3600 + nice: 64 + - + cron: "30 * * * 5-6" + onlinedeadline: 10 + nice: 64 + - + cron: "0 * * * 5-6" + xx: rx + addr: lan +@end verbatim + +tells that on work days of the week call that node every minute, +disconnect after an hour of inactivity and process only relatively high +priority packets (presumably mail ones). So we connect and hold +connection for very long time to pass only emails. On weekends call that +node only each half-hour for processing high-priority packets and +quickly disconnect. Also only on weekends try to connect to that node +every hour only using LAN address and only receiving any (any priority) +packets (assume that low priority huge file transmission are done +additionally via offline connections). + +It contains the following fields (only @emph{cron} is required): + +@table @emph + +@item cron +This is copy-pasted documentation from +@code{github.com/gorhill/cronexpr} library used there. + +@multitable @columnfractions .2 .1 .2 .5 +@headitem Field name @tab Mandatory? @tab Allowed values @tab Allowed special characters + +@item Seconds @tab No @tab 0-59 @tab @verb{|* / , -|} +@item Minutes @tab Yes @tab 0-59 @tab @verb{|* / , -|} +@item Hours @tab Yes @tab 0-23 @tab @verb{|* / , -|} +@item Day of month @tab Yes @tab 1-31 @tab @verb{|* / , - L W|} +@item Month @tab Yes @tab 1-12 or JAN-DEC @tab @verb{|* / , -|} +@item Day of week @tab Yes @tab 0-6 or SUN-SAT @tab @verb{|* / , - L #|} +@item Year @tab No @tab 1970–2099 @tab @verb{|* / , -|} + +@end multitable + +@table @asis + +@item Asterisk (@verb{|*|}) + +The asterisk indicates that the cron expression matches for all values +of the field. E.g., using an asterisk in the 4th field (month) indicates +every month. + +@item Slash (@verb{|/|}) + +Slashes describe increments of ranges. For example @verb{|3-59/15|} in +the minute field indicate the third minute of the hour and every 15 +minutes thereafter. The form @verb{|*/...|} is equivalent to the form +"first-last/...", that is, an increment over the largest possible range +of the field. + +@item Comma (@verb{|,|}) + +Commas are used to separate items of a list. For example, using +@verb{|MON,WED,FRI|} in the 5th field (day of week) means Mondays, +Wednesdays and Fridays. + +@item Hyphen (@verb{|-|}) + +Hyphens define ranges. For example, 2000-2010 indicates every year +between 2000 and 2010 AD, inclusive. + +@item L + +@verb{|L|} stands for "last". When used in the day-of-week field, it +allows you to specify constructs such as "the last Friday" (@verb{|5L|}) +of a given month. In the day-of-month field, it specifies the last day +of the month. + +@item W + +The @verb{|W|} character is allowed for the day-of-month field. This +character is used to specify the business day (Monday-Friday) nearest +the given day. As an example, if you were to specify @verb{|15W|} as the +value for the day-of-month field, the meaning is: "the nearest business +day to the 15th of the month." + +So, if the 15th is a Saturday, the trigger fires on Friday the 14th. If +the 15th is a Sunday, the trigger fires on Monday the 16th. If the 15th +is a Tuesday, then it fires on Tuesday the 15th. However if you specify +@verb{|1W|} as the value for day-of-month, and the 1st is a Saturday, +the trigger fires on Monday the 3rd, as it does not 'jump' over the +boundary of a month's days. + +The @verb{|W|} character can be specified only when the day-of-month is +a single day, not a range or list of days. + +The @verb{|W|} character can also be combined with @verb{|L|}, i.e. +@verb{|LW|} to mean "the last business day of the month." + +@item Hash (@verb{|#|}) + +@verb{|#|} is allowed for the day-of-week field, and must be followed by +a number between one and five. It allows you to specify constructs such +as "the second Friday" of a given month. + +@end table + +Predefined cron expressions: + +@multitable @columnfractions .1 .75 .15 +@headitem Entry @tab Description @tab Equivalent to +@item @verb{|@annually|} @tab + Run once a year at midnight in the morning of January 1 @tab + @verb{|0 0 0 1 1 * *|} +@item @verb{|@yearly|} @tab + Run once a year at midnight in the morning of January 1 @tab + @verb{|0 0 0 1 1 * *|} +@item @verb{|@monthly|} @tab + Run once a month at midnight in the morning of the first of the month @tab + @verb{|0 0 0 1 * * *|} +@item @verb{|@weekly|} @tab + Run once a week at midnight in the morning of Sunday @tab + @verb{|0 0 0 * * 0 *|} +@item @verb{|@daily|} @tab + Run once a day at midnight @tab + @verb{|0 0 0 * * * *|} +@item @verb{|@hourly|} @tab + Run once an hour at the beginning of the hour @tab + @verb{|0 0 * * * * *|} +@end multitable + +@itemize +@item +If only six fields are present, a @verb{|0|} second field is prepended, +that is, @verb{|* * * * * 2013|} internally become +@verb{|0 * * * * * 2013|}. +@item +If only five fields are present, a @verb{|0|} second field is prepended +and a wildcard year field is appended, that is, @verb{|* * * * Mon|} +internally become @verb{|0 * * * * Mon *|}. +@item +Domain for day-of-week field is [0-7] instead of [0-6], 7 being Sunday +(like 0). This to comply with @url{http://linux.die.net/man/5/crontab}. +@end itemize + +@item nice +Optional. Use that @ref{Niceness, niceness} during the call (255 is used +otherwise). + +@item xx +Optional. Either @verb{|rx|} or @verb{|tx|}. Tells only to either to +receive or to transmit data during that call. + +@item addr +Optional. Call only that address, instead of trying all from +@ref{CfgAddrs, @emph{addrs}} configuration option. It can be either key +from @emph{addrs} dictionary, or an ordinary @option{addr:port}. + +@item onlinedeadline +Optional. Override @ref{CfgOnlineDeadline, @emph{onlinedeadline}} +configuration option when calling. + +@end table diff --git a/doc/cfg.texi b/doc/cfg.texi index 28b1e5d..acfdbf7 100644 --- a/doc/cfg.texi +++ b/doc/cfg.texi @@ -39,6 +39,9 @@ neigh: addrs: lan: "[fe80::1234%igb0]:5400" internet: alice.com:3389 + calls: + - + cron: "*/2 * * * *" bob: id: 2IZNP...UYGYA exchpub: WFLMZ...B7NHA @@ -52,6 +55,7 @@ neigh: directory. @strong{log} field contains an absolute path to @ref{Log, log} file. +@anchor{CfgNotify} @strong{notify} section contains notification settings for successfully tossed file and freq packets. Corresponding @strong{from} and @strong{to} fields will substituted in notification email message. @@ -78,15 +82,18 @@ node has the following fields: If present, then node can be online called using @ref{Sync, synchronization protocol}. Contains authentication public key. +@anchor{CfgSendmail} @item sendmail An array containing path to executable and its command line arguments that is called for mail sending. If it is empty, then no mail processing will be performed from that node. +@anchor{CfgIncoming} @item incoming Full path to directory where all file uploads will be saved. May be omitted to forbid file uploading on that node. +@anchor{CfgFreq} @item freq Full path to directory from where file requests will queue files for transmission. May be omitted to forbid freqing from that node. @@ -97,6 +104,7 @@ For example @verb{|[foo,bar]|} means that packet can reach current node by transitioning through @emph{foo} and then @emph{bar} nodes. May be omitted if direct connection exists and no relaying is required. +@anchor{CfgAddrs} @item addrs Dictionary containing known network addresses of the node. Each key is human-readable name of the link/address. Values are @verb{|addr:port|} @@ -104,7 +112,7 @@ pairs pointing to @ref{nncp-daemon}'s listening instance. May be omitted if either no direct connection exists, or @ref{nncp-call} is used with forced address specifying. -@anchor{Onlinedeadline} +@anchor{CfgOnlineDeadline} @item onlinedeadline Online connection deadline of node inactivity in seconds. It is the time connection considered dead after not receiving/sending any packets and @@ -114,4 +122,9 @@ transmitted. This can be set to rather high values to keep connection alive (to reduce handshake overhead and delays), wait for appearing packets ready to send and notifying remote side about their appearance. +@anchor{CfgCalls} +@item calls +List of @ref{Call, call configuration}s. Can be omitted if +@ref{nncp-caller} won't be used to call that node. + @end table diff --git a/doc/cmds.texi b/doc/cmds.texi index c5191f3..b71b202 100644 --- a/doc/cmds.texi +++ b/doc/cmds.texi @@ -48,11 +48,25 @@ If @option{-rx} option is specified then only inbound packets transmission is performed. If @option{-tx} option is specified, then only outbound transmission is performed. -@option{-onlinedeadline} overrides @ref{Onlinedeadline, +@node nncp-caller +@section nncp-caller + +@verbatim +% nncp-caller [options] [NODE ...] +@end verbatim + +Croned daemon that calls remote nodes from time to time, according to +their @ref{CfgCalls, @emph{calls}} configuration field. + +Optional number of @option{NODE}s tells to call only them, ignoring the +other. Otherwise all nodes with specified @emph{calls} configuration +field will be called. + +@option{-onlinedeadline} overrides @ref{CfgOnlineDeadline, @emph{onlinedeadline}} configuration option. Each @option{NODE} can contain several uniquely identified -@option{ADDR}esses in @ref{Configuration, configuration} file. If you do +@option{ADDR}esses in @ref{CfgAddrs, configuration} file. If you do not specify the exact one, then all will be tried until the first success. Optionally you can force @option{FORCEADDR} address usage, instead of addresses taken from configuration file. @@ -100,7 +114,7 @@ bind to and listen. @end verbatim Send @file{SRC} file to remote @option{NODE}. @file{DST} specifies -destination file name in remote's @ref{Configuration, incoming} +destination file name in remote's @ref{CfgIncoming, incoming} directory. If this file already exists there, then counter will be appended to it. @@ -108,7 +122,7 @@ This command queues file in @ref{Spool, spool} directory immediately (through the temporary file of course) -- so pay attention that sending 2 GiB file will create 2 GiB outbound encrypted packet. -If @ref{Configuration, notification} is enabled on the remote side for +If @ref{CfgNotify, notification} is enabled on the remote side for file transmissions, then it will sent simple letter after successful file receiving. @@ -120,10 +134,10 @@ file receiving. @end verbatim Send file request to @option{NODE}, asking it to send its @file{SRC} -file from @ref{Configuration, freq} directory to our node under -@file{DST} filename in our @ref{Configuration, incoming} one. +file from @ref{CfgFreq, freq} directory to our node under @file{DST} +filename in our @ref{CfgIncoming, incoming} one. -If @ref{Configuration, notification} is enabled on the remote side for +If @ref{CfgNotify, notification} is enabled on the remote side for file request, then it will sent simple letter after successful file queuing. @@ -145,7 +159,7 @@ Parse @ref{Log, log} file and print out its records in human-readable form. Send mail, that is read from stdin, to @option{NODE} and specified @option{USER}s. Mail message will be compressed. After receiving, remote -side will execute specified @ref{Configuration, sendmail} command with +side will execute specified @ref{CfgSendmail, sendmail} command with @option{USER}s appended as a command line argument and feed decompressed mail body to that command's stdin. diff --git a/doc/index.texi b/doc/index.texi index 709c5ec..64ec8de 100644 --- a/doc/index.texi +++ b/doc/index.texi @@ -62,6 +62,7 @@ integration with current SMTP servers are the reasons. * Workflow:: * Installation:: * Configuration:: +* Call configuration: Call. * Commands:: * Niceness:: * Spool directory: Spool. @@ -77,6 +78,7 @@ integration with current SMTP servers are the reasons. @include workflow.texi @include install.texi @include cfg.texi +@include call.texi @include cmds.texi @include niceness.texi @include spool.texi diff --git a/doc/sp.texi b/doc/sp.texi index 21e7b9e..282a1b9 100644 --- a/doc/sp.texi +++ b/doc/sp.texi @@ -165,7 +165,7 @@ delete @file{.part} suffix from file's name and send @emph{DONE} packet. are queued. @item Each second node check are there any new @emph{tx} packets appeared and queues corresponding @emph{INFO} packets. -@item If no packets are sent and received during @ref{Onlinedeadline, +@item If no packets are sent and received during @ref{CfgOnlineDeadline, onlinedeadline} duration, then close the connection. There is no explicit indication that session is over. @end enumerate diff --git a/doc/workflow.texi b/doc/workflow.texi index dcb5458..9a67094 100644 --- a/doc/workflow.texi +++ b/doc/workflow.texi @@ -20,7 +20,8 @@ many times any time as you wish. @itemize @item run @ref{nncp-daemon} to accept remotely initiated connections to your node - @item run @ref{nncp-call} to initiate connection to required nodes + @item run @ref{nncp-caller} to initiate connection to required nodes + from time to time @item use @ref{nncp-xfer} with removable storage devices for copying packets for/from other nodes @end itemize diff --git a/src/cypherpunks.ru/nncp/call.go b/src/cypherpunks.ru/nncp/call.go new file mode 100644 index 0000000..fa1aff7 --- /dev/null +++ b/src/cypherpunks.ru/nncp/call.go @@ -0,0 +1,57 @@ +/* +NNCP -- Node to Node copy, utilities for store-and-forward data exchange +Copyright (C) 2016-2017 Sergey Matveev + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +*/ + +package nncp + +import ( + "net" + "strconv" +) + +func (ctx *Ctx) CallNode(node *Node, addrs []string, nice uint8, xxOnly *TRxTx, onlineDeadline int) (isGood bool) { + for _, addr := range addrs { + sds := SDS{"node": node.Id, "addr": addr} + ctx.LogD("call", sds, "dialing") + conn, err := net.Dial("tcp", addr) + if err != nil { + ctx.LogD("call", SdsAdd(sds, SDS{"err": err}), "dialing") + continue + } + ctx.LogD("call", sds, "connected") + state, err := ctx.StartI(conn, node.Id, nice, xxOnly, onlineDeadline) + if err == nil { + ctx.LogI("call-start", sds, "connected") + state.Wait() + ctx.LogI("call-finish", SDS{ + "node": state.Node.Id, + "duration": strconv.FormatInt(int64(state.Duration.Seconds()), 10), + "rxbytes": strconv.FormatInt(state.RxBytes, 10), + "txbytes": strconv.FormatInt(state.TxBytes, 10), + "rxspeed": strconv.FormatInt(state.RxSpeed, 10), + "txspeed": strconv.FormatInt(state.TxSpeed, 10), + }, "") + isGood = true + conn.Close() + break + } else { + ctx.LogE("call-start", SdsAdd(sds, SDS{"err": err}), "") + conn.Close() + } + } + return +} diff --git a/src/cypherpunks.ru/nncp/cfg.go b/src/cypherpunks.ru/nncp/cfg.go index a10c670..10dd46b 100644 --- a/src/cypherpunks.ru/nncp/cfg.go +++ b/src/cypherpunks.ru/nncp/cfg.go @@ -23,6 +23,7 @@ import ( "os" "path" + "github.com/gorhill/cronexpr" "golang.org/x/crypto/ed25519" "gopkg.in/yaml.v2" ) @@ -42,15 +43,24 @@ type NodeYAML struct { SignPub string NoisePub *string `noisepub,omitempty` Sendmail []string - Incoming *string `incoming,omitempty` - Freq *string `freq,omitempty` - Via []string `via,omitempty` + Incoming *string `incoming,omitempty` + Freq *string `freq,omitempty` + Via []string `via,omitempty` + Calls []CallYAML `calls,omitempty` Addrs map[string]string `addrs,omitempty` OnlineDeadline *int `onlinedeadline,omitempty` } +type CallYAML struct { + Cron string + Nice *int `nice,omitempty` + Xx *string `xx,omitempty` + Addr *string `addr,omitempty` + OnlineDeadline *int `onlinedeadline,omitempty` +} + type NodeOurYAML struct { Id string ExchPub string @@ -131,6 +141,62 @@ func NewNode(name string, yml NodeYAML) (*Node, error) { freq = &fr } + defOnlineDeadline := int(DefaultDeadline) + if yml.OnlineDeadline != nil { + if *yml.OnlineDeadline <= 0 { + return nil, errors.New("OnlineDeadline must be at least 1 second") + } + defOnlineDeadline = *yml.OnlineDeadline + } + + var calls []*Call + for _, callYml := range yml.Calls { + expr, err := cronexpr.Parse(callYml.Cron) + if err != nil { + return nil, err + } + nice := uint8(255) + if callYml.Nice != nil { + if *callYml.Nice < 1 || *callYml.Nice > 255 { + return nil, errors.New("Nice must be between 1 and 255") + } + nice = uint8(*callYml.Nice) + } + var xx TRxTx + if callYml.Xx != nil { + switch *callYml.Xx { + case "rx": + xx = TRx + case "tx": + xx = TTx + default: + return nil, errors.New("xx field must be either \"rx\" or \"tx\"") + } + } + var addr *string + if callYml.Addr != nil { + if a, exists := yml.Addrs[*callYml.Addr]; exists { + addr = &a + } else { + addr = callYml.Addr + } + } + onlineDeadline := defOnlineDeadline + if callYml.OnlineDeadline != nil { + if *yml.OnlineDeadline <= 0 { + return nil, errors.New("OnlineDeadline must be at least 1 second") + } + onlineDeadline = *callYml.OnlineDeadline + } + calls = append(calls, &Call{ + Cron: expr, + Nice: nice, + Xx: &xx, + Addr: addr, + OnlineDeadline: onlineDeadline, + }) + } + node := Node{ Name: name, Id: nodeId, @@ -139,20 +205,15 @@ func NewNode(name string, yml NodeYAML) (*Node, error) { Sendmail: yml.Sendmail, Incoming: incoming, Freq: freq, + Calls: calls, Addrs: yml.Addrs, - OnlineDeadline: DefaultDeadline, + OnlineDeadline: defOnlineDeadline, } copy(node.ExchPub[:], exchPub) if len(noisePub) > 0 { node.NoisePub = new([32]byte) copy(node.NoisePub[:], noisePub) } - if yml.OnlineDeadline != nil { - if *yml.OnlineDeadline <= 0 { - return nil, errors.New("OnlineDeadline must be at least 1 second") - } - node.OnlineDeadline = *yml.OnlineDeadline - } return &node, nil } diff --git a/src/cypherpunks.ru/nncp/cmd/nncp-call/main.go b/src/cypherpunks.ru/nncp/cmd/nncp-call/main.go index 771582b..a087544 100644 --- a/src/cypherpunks.ru/nncp/cmd/nncp-call/main.go +++ b/src/cypherpunks.ru/nncp/cmd/nncp-call/main.go @@ -24,9 +24,7 @@ import ( "fmt" "io/ioutil" "log" - "net" "os" - "strconv" "strings" "cypherpunks.ru/nncp" @@ -95,8 +93,8 @@ func main() { log.Fatalln("Node does not have online communication capability") } - if *onlineDeadline > 0 { - node.OnlineDeadline = *onlineDeadline + if *onlineDeadline == 0 { + onlineDeadline = &node.OnlineDeadline } var xxOnly nncp.TRxTx @@ -121,36 +119,7 @@ func main() { } } - isGood := false - for _, addr := range addrs { - ctx.LogD("call", nncp.SDS{"addr": addr}, "dialing") - conn, err := net.Dial("tcp", addr) - if err != nil { - log.Println("Can not connect:", err) - continue - } - ctx.LogD("call", nncp.SDS{"addr": addr}, "connected") - state, err := ctx.StartI(conn, node.Id, nice, &xxOnly) - if err == nil { - ctx.LogI("call-start", nncp.SDS{"node": state.Node.Id}, "connected") - state.Wait() - ctx.LogI("call-finish", nncp.SDS{ - "node": state.Node.Id, - "duration": strconv.FormatInt(int64(state.Duration.Seconds()), 10), - "rxbytes": strconv.FormatInt(state.RxBytes, 10), - "txbytes": strconv.FormatInt(state.TxBytes, 10), - "rxspeed": strconv.FormatInt(state.RxSpeed, 10), - "txspeed": strconv.FormatInt(state.TxSpeed, 10), - }, "") - isGood = true - conn.Close() - break - } else { - ctx.LogE("call-start", nncp.SDS{"node": state.Node.Id, "err": err}, "") - conn.Close() - } - } - if !isGood { + if !ctx.CallNode(node, addrs, nice, &xxOnly, *onlineDeadline) { os.Exit(1) } } diff --git a/src/cypherpunks.ru/nncp/cmd/nncp-caller/main.go b/src/cypherpunks.ru/nncp/cmd/nncp-caller/main.go new file mode 100644 index 0000000..24a1a80 --- /dev/null +++ b/src/cypherpunks.ru/nncp/cmd/nncp-caller/main.go @@ -0,0 +1,138 @@ +/* +NNCP -- Node to Node copy, utilities for store-and-forward data exchange +Copyright (C) 2016-2017 Sergey Matveev + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +*/ + +// Croned NNCP TCP daemon caller +package main + +import ( + "flag" + "fmt" + "io/ioutil" + "log" + "os" + "strconv" + "sync" + "time" + + "cypherpunks.ru/nncp" +) + +func usage() { + fmt.Fprintf(os.Stderr, nncp.UsageHeader()) + fmt.Fprintln(os.Stderr, "nncp-caller -- croned NNCP TCP daemon caller\n") + fmt.Fprintf(os.Stderr, "Usage: %s [options] [NODE ...]\n", os.Args[0]) + fmt.Fprintln(os.Stderr, "Options:") + flag.PrintDefaults() +} + +func main() { + var ( + cfgPath = flag.String("cfg", nncp.DefaultCfgPath, "Path to configuration file") + quiet = flag.Bool("quiet", false, "Print only errors") + debug = flag.Bool("debug", false, "Print debug messages") + version = flag.Bool("version", false, "Print version information") + warranty = flag.Bool("warranty", false, "Print warranty information") + ) + flag.Usage = usage + flag.Parse() + if *warranty { + fmt.Println(nncp.Warranty) + return + } + if *version { + fmt.Println(nncp.VersionGet()) + return + } + + cfgRaw, err := ioutil.ReadFile(nncp.CfgPathFromEnv(cfgPath)) + if err != nil { + log.Fatalln("Can not read config:", err) + } + ctx, err := nncp.CfgParse(cfgRaw) + if err != nil { + log.Fatalln("Can not parse config:", err) + } + ctx.Quiet = *quiet + ctx.Debug = *debug + + var nodes []*nncp.Node + if flag.NArg() > 0 { + for _, nodeId := range flag.Args() { + node, err := ctx.FindNode(nodeId) + if err != nil { + log.Fatalln("Invalid NODE specified:", err) + } + if len(node.Calls) == 0 { + ctx.LogD("caller", nncp.SDS{"node": node.Id}, "has no calls, skipping") + continue + } + nodes = append(nodes, node) + } + } else { + for _, node := range ctx.Neigh { + if len(node.Calls) == 0 { + ctx.LogD("caller", nncp.SDS{"node": node.Id}, "has no calls, skipping") + continue + } + nodes = append(nodes, node) + } + } + + var wg sync.WaitGroup + for _, node := range nodes { + for i, call := range node.Calls { + wg.Add(1) + go func(node *nncp.Node, i int, call *nncp.Call) { + defer wg.Done() + var addrs []string + if call.Addr == nil { + for _, addr := range node.Addrs { + addrs = append(addrs, addr) + } + } else { + addrs = append(addrs, *call.Addr) + } + sds := nncp.SDS{"node": node.Id, "callindex": strconv.Itoa(i)} + for { + n := time.Now() + t := call.Cron.Next(n) + ctx.LogD("caller", sds, t.String()) + if t.IsZero() { + ctx.LogE("caller", sds, "got zero time") + return + } + time.Sleep(t.Sub(n)) + node.Lock() + if node.Busy { + node.Unlock() + ctx.LogD("caller", sds, "busy") + continue + } else { + node.Busy = true + node.Unlock() + ctx.CallNode(node, addrs, call.Nice, call.Xx, call.OnlineDeadline) + node.Lock() + node.Busy = false + node.Unlock() + } + } + }(node, i, call) + } + } + wg.Wait() +} diff --git a/src/cypherpunks.ru/nncp/node.go b/src/cypherpunks.ru/nncp/node.go index 5edd5ea..1e1b5ba 100644 --- a/src/cypherpunks.ru/nncp/node.go +++ b/src/cypherpunks.ru/nncp/node.go @@ -21,8 +21,10 @@ package nncp import ( "crypto/rand" "errors" + "sync" "github.com/flynn/noise" + "github.com/gorhill/cronexpr" "golang.org/x/crypto/blake2b" "golang.org/x/crypto/ed25519" "golang.org/x/crypto/nacl/box" @@ -46,6 +48,10 @@ type Node struct { Via []*NodeId Addrs map[string]string OnlineDeadline int + Calls []*Call + + Busy bool + sync.Mutex } type NodeOur struct { @@ -58,6 +64,14 @@ type NodeOur struct { NoisePrv *[32]byte } +type Call struct { + Cron *cronexpr.Expression + Nice uint8 + Xx *TRxTx + Addr *string + OnlineDeadline int +} + func NewNodeGenerate() (*NodeOur, error) { exchPub, exchPrv, err := box.GenerateKey(rand.Reader) if err != nil { diff --git a/src/cypherpunks.ru/nncp/sp.go b/src/cypherpunks.ru/nncp/sp.go index 3200ea4..82af817 100644 --- a/src/cypherpunks.ru/nncp/sp.go +++ b/src/cypherpunks.ru/nncp/sp.go @@ -157,35 +157,36 @@ func payloadsSplit(payloads [][]byte) [][]byte { } type SPState struct { - ctx *Ctx - Node *Node - nice uint8 - hs *noise.HandshakeState - csOur *noise.CipherState - csTheir *noise.CipherState - payloads chan []byte - infosTheir map[[32]byte]*SPInfo - infosOurSeen map[[32]byte]struct{} - queueTheir []*SPFreq - wg sync.WaitGroup - RxBytes int64 - RxLastSeen time.Time - TxBytes int64 - TxLastSeen time.Time - started time.Time - Duration time.Duration - RxSpeed int64 - TxSpeed int64 - rxLock *os.File - txLock *os.File - xxOnly *TRxTx - isDead bool + ctx *Ctx + Node *Node + onlineDeadline int + nice uint8 + hs *noise.HandshakeState + csOur *noise.CipherState + csTheir *noise.CipherState + payloads chan []byte + infosTheir map[[32]byte]*SPInfo + infosOurSeen map[[32]byte]struct{} + queueTheir []*SPFreq + wg sync.WaitGroup + RxBytes int64 + RxLastSeen time.Time + TxBytes int64 + TxLastSeen time.Time + started time.Time + Duration time.Duration + RxSpeed int64 + TxSpeed int64 + rxLock *os.File + txLock *os.File + xxOnly *TRxTx + isDead bool sync.RWMutex } func (state *SPState) NotAlive() bool { now := time.Now() - return state.isDead || (int(now.Sub(state.RxLastSeen).Seconds()) >= state.Node.OnlineDeadline && int(now.Sub(state.TxLastSeen).Seconds()) >= state.Node.OnlineDeadline) + return state.isDead || (int(now.Sub(state.RxLastSeen).Seconds()) >= state.onlineDeadline && int(now.Sub(state.TxLastSeen).Seconds()) >= state.onlineDeadline) } func (state *SPState) dirUnlock() { @@ -256,7 +257,7 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]struct{} return payloadsSplit(payloads) } -func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx) (*SPState, error) { +func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx, onlineDeadline int) (*SPState, error) { err := ctx.ensureRxDir(nodeId) if err != nil { return nil, err @@ -288,17 +289,18 @@ func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx) PeerStatic: node.NoisePub[:], } state := SPState{ - ctx: ctx, - hs: noise.NewHandshakeState(conf), - Node: node, - nice: nice, - payloads: make(chan []byte), - infosTheir: make(map[[32]byte]*SPInfo), - infosOurSeen: make(map[[32]byte]struct{}), - started: started, - rxLock: rxLock, - txLock: txLock, - xxOnly: xxOnly, + ctx: ctx, + hs: noise.NewHandshakeState(conf), + Node: node, + onlineDeadline: onlineDeadline, + nice: nice, + payloads: make(chan []byte), + infosTheir: make(map[[32]byte]*SPInfo), + infosOurSeen: make(map[[32]byte]struct{}), + started: started, + rxLock: rxLock, + txLock: txLock, + xxOnly: xxOnly, } var infosPayloads [][]byte @@ -399,6 +401,7 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, erro return nil, errors.New("Unknown peer: " + peerId) } state.Node = node + state.onlineDeadline = node.OnlineDeadline sds := SDS{"node": node.Id, "nice": strconv.Itoa(int(nice))} if ctx.ensureRxDir(node.Id); err != nil {