From: Sergey Matveev Date: Sat, 23 Jan 2021 19:22:26 +0000 (+0300) Subject: recfile log format X-Git-Tag: v6.0.0^2 X-Git-Url: http://www.git.cypherpunks.ru/?p=nncp.git;a=commitdiff_plain;h=ff2139ccf41d72a3c1c4b56c2106effd1ef2e841 recfile log format --- diff --git a/doc/cmds.texi b/doc/cmds.texi index e7e21ea..6510ed8 100644 --- a/doc/cmds.texi +++ b/doc/cmds.texi @@ -398,7 +398,8 @@ queuing. $ nncp-log [options] @end example -Parse @ref{Log, log} file and print out its records in human-readable form. +Parse @ref{Log, log} file and print out its records in short +human-readable form. @node nncp-pkt @section nncp-pkt diff --git a/doc/log.texi b/doc/log.texi index 9bcff52..3c91f61 100644 --- a/doc/log.texi +++ b/doc/log.texi @@ -1,35 +1,8 @@ @node Log @unnumbered Log format -Log is a plaintext file with single log entry per line. Lines are "\n" -separated. It is not intended to be read by human -- use @ref{nncp-log} -utility. - -Each line has the following format: - -@verbatim -LEVEL | DATETIME | SD | MSG -@end verbatim - -Example log records: - -@verbatim -I 2017-01-09T08:41:54.751732131Z [nncp-xfer node="VHMTRWDOXPLK7BR55ICZ5N32ZJUMRKZEMFNGGCEAXV66GG43PEBQ" pkt="KMG6FO5UNEK7HWVFJPWQYC7MOZ76KEZ4FWCGM62PWA2QE5755NPA" size="4162548" xx="tx"] -I 2017-01-09T08:42:18.990005394Z [sp-infos node="BYRRQUULEHINPKEFN7CHMSHR5I5CK7PMX5HQNCYERTBAR4BOCG6Q" pkts="0" size="0" xx="tx"] -I 2017-01-09T08:48:59.264847401Z [call-finish duration="10" node="BYRRQUULEHINPKEFN7CHMSHR5I5CK7PMX5HQNCYERTBAR4BOCG6Q" rxbytes="60" rxspeed="60" txbytes="108" txspeed="108"] -@end verbatim - -@table @emph -@item | - Space character. -@item LEVEL - Is single character log level. As a rule is is either @verb{|I|} - (informational message), or @verb{|E|} (error message). -@item DATETIME - UTC datetime in @url{https://tools.ietf.org/html/rfc339, RFC 3339} - @verb{|2006-01-02T15:04:05.999999999Z|} format. -@item SD - Structured data as in @url{https://tools.ietf.org/html/rfc5424, RFC 5424}. -@item MSG - Arbitrary UTF-8 encoded text data. -@end table +Log is a plaintext file consisting of +@url{https://www.gnu.org/software/recutils/, recfile} records. It can be +read by human, but it is better to use either @ref{nncp-log}, or +@command{recutils} utilities for selecting and formatting the required +fields. diff --git a/doc/news.ru.texi b/doc/news.ru.texi index 6c42a2a..d758060 100644 --- a/doc/news.ru.texi +++ b/doc/news.ru.texi @@ -1,10 +1,16 @@ @node Новости @section Новости -@node Релиз 5.7.0 -@subsection Релиз 5.7.0 +@node Релиз 6.0.0 +@subsection Релиз 6.0.0 @itemize +@item +Журнал использует человеко-читаемый и легко обрабатываемый машиной +@url{https://www.gnu.org/software/recutils/, recfile} формат для своих +записей, вместо структурированных строчек RFC 3339. Старый формат +журналов не поддерживается @command{nncp-log}. + @item Работоспособность @option{-autotoss*} опции с @option{-inetd} режимом @command{nncp-daemon}. diff --git a/doc/news.texi b/doc/news.texi index ebd48da..866fb53 100644 --- a/doc/news.texi +++ b/doc/news.texi @@ -3,10 +3,15 @@ See also this page @ref{Новости, on russian}. -@node Release 5.7.0 -@section Release 5.7.0 +@node Release 6.0.0 +@section Release 6.0.0 @itemize +@item Log uses human readable and easy machine parseable +@url{https://www.gnu.org/software/recutils/, recfile} format for the +records, instead of structured RFC 3339 lines. Old logs are not readable +by @command{nncp-log} anymore. + @item @option{-autotoss*} option workability with @command{nncp-daemon}'s @option{-inetd} mode. diff --git a/src/call.go b/src/call.go index 1635b30..ef4d07d 100644 --- a/src/call.go +++ b/src/call.go @@ -54,8 +54,8 @@ func (ctx *Ctx) CallNode( onlyPkts map[[32]byte]bool, ) (isGood bool) { for _, addr := range addrs { - sds := SDS{"node": node.Id, "addr": addr} - ctx.LogD("call", sds, "dialing") + les := LEs{{"Node", node.Id}, {"Addr", addr}} + ctx.LogD("call", les, "dialing") var conn ConnDeadlined var err error if addr[0] == '|' { @@ -64,10 +64,10 @@ func (ctx *Ctx) CallNode( conn, err = net.Dial("tcp", addr) } if err != nil { - ctx.LogD("call", SdsAdd(sds, SDS{"err": err}), "dialing") + ctx.LogD("call", append(les, LE{"Err", err}), "dialing") continue } - ctx.LogD("call", sds, "connected") + ctx.LogD("call", les, "connected") state := SPState{ Ctx: ctx, Node: node, @@ -81,21 +81,21 @@ func (ctx *Ctx) CallNode( onlyPkts: onlyPkts, } if err = state.StartI(conn); err == nil { - ctx.LogI("call-start", sds, "connected") + ctx.LogI("call-start", les, "connected") state.Wait() - ctx.LogI("call-finish", SDS{ - "node": state.Node.Id, - "duration": int64(state.Duration.Seconds()), - "rxbytes": state.RxBytes, - "txbytes": state.TxBytes, - "rxspeed": state.RxSpeed, - "txspeed": state.TxSpeed, + ctx.LogI("call-finish", LEs{ + {"Node", state.Node.Id}, + {"Duration", int64(state.Duration.Seconds())}, + {"RxBytes", state.RxBytes}, + {"TxBytes", state.TxBytes}, + {"RxSpeed", state.RxSpeed}, + {"TxSpeed", state.TxSpeed}, }, "") isGood = true conn.Close() // #nosec G104 break } else { - ctx.LogE("call-start", sds, err, "") + ctx.LogE("call-start", les, err, "") conn.Close() // #nosec G104 } } diff --git a/src/check.go b/src/check.go index 4a1be88..731e869 100644 --- a/src/check.go +++ b/src/check.go @@ -27,12 +27,12 @@ import ( "golang.org/x/crypto/blake2b" ) -func Check(src io.Reader, checksum []byte, sds SDS, showPrgrs bool) (bool, error) { +func Check(src io.Reader, checksum []byte, les LEs, showPrgrs bool) (bool, error) { hsh, err := blake2b.New256(nil) if err != nil { log.Fatalln(err) } - if _, err = CopyProgressed(hsh, bufio.NewReader(src), "check", sds, showPrgrs); err != nil { + if _, err = CopyProgressed(hsh, bufio.NewReader(src), "check", les, showPrgrs); err != nil { return false, err } return bytes.Compare(hsh.Sum(nil), checksum) == 0, nil @@ -41,21 +41,21 @@ func Check(src io.Reader, checksum []byte, sds SDS, showPrgrs bool) (bool, error func (ctx *Ctx) checkXxIsBad(nodeId *NodeId, xx TRxTx) bool { isBad := false for job := range ctx.Jobs(nodeId, xx) { - sds := SDS{ - "xx": string(xx), - "node": nodeId, - "pkt": Base32Codec.EncodeToString(job.HshValue[:]), - "fullsize": job.Size, + les := LEs{ + {"XX", string(xx)}, + {"Node", nodeId}, + {"Pkt", Base32Codec.EncodeToString(job.HshValue[:])}, + {"FullSize", job.Size}, } - gut, err := Check(job.Fd, job.HshValue[:], sds, ctx.ShowPrgrs) + gut, err := Check(job.Fd, job.HshValue[:], les, ctx.ShowPrgrs) job.Fd.Close() // #nosec G104 if err != nil { - ctx.LogE("check", sds, err, "") + ctx.LogE("check", les, err, "") return true } if !gut { isBad = true - ctx.LogE("check", sds, errors.New("bad"), "") + ctx.LogE("check", les, errors.New("bad"), "") } } return isBad diff --git a/src/cmd/nncp-bundle/main.go b/src/cmd/nncp-bundle/main.go index ae14e89..0dbe3ba 100644 --- a/src/cmd/nncp-bundle/main.go +++ b/src/cmd/nncp-bundle/main.go @@ -114,19 +114,21 @@ func main() { ctx.Umask() - sds := nncp.SDS{} if *doTx { - sds["xx"] = string(nncp.TTx) var pktName string bufStdout := bufio.NewWriter(os.Stdout) tarWr := tar.NewWriter(bufStdout) - for nodeId, _ := range nodeIds { - sds["node"] = nodeId.String() + for nodeId := range nodeIds { + les := nncp.LEs{ + {K: "XX", V: string(nncp.TTx)}, + {K: "Node", V: nodeId.String()}, + {K: "Pkt", V: "dummy"}, + } for job := range ctx.Jobs(&nodeId, nncp.TTx) { pktName = filepath.Base(job.Fd.Name()) - sds["pkt"] = pktName + les[len(les)-1].V = pktName if job.PktEnc.Nice > nice { - ctx.LogD("nncp-bundle", sds, "too nice") + ctx.LogD("nncp-bundle", les, "too nice") job.Fd.Close() // #nosec G104 continue } @@ -154,10 +156,10 @@ func main() { } if _, err = nncp.CopyProgressed( tarWr, job.Fd, "Tx", - nncp.SdsAdd(sds, nncp.SDS{ - "pkt": nncp.Base32Codec.EncodeToString(job.HshValue[:]), - "fullsize": job.Size, - }), + append(les, nncp.LEs{ + {K: "Pkt", V: nncp.Base32Codec.EncodeToString(job.HshValue[:])}, + {K: "FullSize", V: job.Size}, + }...), ctx.ShowPrgrs, ); err != nil { log.Fatalln("Error during copying to tar:", err) @@ -174,7 +176,7 @@ func main() { log.Fatalln("Error during deletion:", err) } } - ctx.LogI("nncp-bundle", nncp.SdsAdd(sds, nncp.SDS{"size": job.Size}), "") + ctx.LogI("nncp-bundle", append(les, nncp.LE{K: "Size", V: job.Size}), "") } } if err = tarWr.Close(); err != nil { @@ -201,20 +203,23 @@ func main() { panic(err) } tarR := tar.NewReader(bufStdin) - sds["xx"] = string(nncp.TRx) entry, err := tarR.Next() if err != nil { if err != io.EOF { ctx.LogD( "nncp-bundle", - nncp.SdsAdd(sds, nncp.SDS{"err": err}), + nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Err", V: err}}, "error reading tar", ) } continue } if entry.Typeflag != tar.TypeDir { - ctx.LogD("nncp-bundle", sds, "Expected NNCP/") + ctx.LogD( + "nncp-bundle", + nncp.LEs{{K: "XX", V: string(nncp.TRx)}}, + "Expected NNCP/", + ) continue } entry, err = tarR.Next() @@ -222,61 +227,58 @@ func main() { if err != io.EOF { ctx.LogD( "nncp-bundle", - nncp.SdsAdd(sds, nncp.SDS{"err": err}), + nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Err", V: err}}, "error reading tar", ) } continue } - sds["pkt"] = entry.Name + les := nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Pkt", V: entry.Name}} if entry.Size < nncp.PktEncOverhead { - ctx.LogD("nncp-bundle", sds, "Too small packet") + ctx.LogD("nncp-bundle", les, "Too small packet") continue } if !ctx.IsEnoughSpace(entry.Size) { - ctx.LogE("nncp-bundle", sds, errors.New("not enough spool space"), "") + ctx.LogE("nncp-bundle", les, errors.New("not enough spool space"), "") continue } pktName := filepath.Base(entry.Name) if _, err = nncp.Base32Codec.DecodeString(pktName); err != nil { - ctx.LogD("nncp-bundle", nncp.SdsAdd(sds, nncp.SDS{"err": "bad packet name"}), "") + ctx.LogD("nncp-bundle", append(les, nncp.LE{K: "Err", V: "bad packet name"}), "") continue } if _, err = io.ReadFull(tarR, pktEncBuf); err != nil { - ctx.LogD("nncp-bundle", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "read") + ctx.LogD("nncp-bundle", append(les, nncp.LE{K: "Err", V: err}), "read") continue } if _, err = xdr.Unmarshal(bytes.NewReader(pktEncBuf), &pktEnc); err != nil { - ctx.LogD("nncp-bundle", sds, "Bad packet structure") + ctx.LogD("nncp-bundle", les, "Bad packet structure") continue } if pktEnc.Magic != nncp.MagicNNCPEv4 { - ctx.LogD("nncp-bundle", sds, "Bad packet magic number") + ctx.LogD("nncp-bundle", les, "Bad packet magic number") continue } if pktEnc.Nice > nice { - ctx.LogD("nncp-bundle", sds, "too nice") + ctx.LogD("nncp-bundle", les, "too nice") continue } if *pktEnc.Sender == *ctx.SelfId && *doDelete { if len(nodeIds) > 0 { if _, exists := nodeIds[*pktEnc.Recipient]; !exists { - ctx.LogD("nncp-bundle", sds, "Recipient is not requested") + ctx.LogD("nncp-bundle", les, "Recipient is not requested") continue } } nodeId32 := nncp.Base32Codec.EncodeToString(pktEnc.Recipient[:]) - sds["xx"] = string(nncp.TTx) - sds["node"] = nodeId32 - sds["pkt"] = pktName - dstPath := filepath.Join( - ctx.Spool, - nodeId32, - string(nncp.TTx), - pktName, - ) + les := nncp.LEs{ + {K: "XX", V: string(nncp.TTx)}, + {K: "Node", V: nodeId32}, + {K: "Pkt", V: pktName}, + } + dstPath := filepath.Join(ctx.Spool, nodeId32, string(nncp.TTx), pktName) if _, err = os.Stat(dstPath); err != nil { - ctx.LogD("nncp-bundle", sds, "Packet is already missing") + ctx.LogD("nncp-bundle", les, "Packet is already missing") continue } hsh, err := blake2b.New256(nil) @@ -288,43 +290,46 @@ func main() { } if _, err = nncp.CopyProgressed( hsh, tarR, "Rx", - nncp.SdsAdd(sds, nncp.SDS{"fullsize": entry.Size}), + append(les, nncp.LE{K: "FullSize", V: entry.Size}), ctx.ShowPrgrs, ); err != nil { log.Fatalln("Error during copying:", err) } if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) == pktName { - ctx.LogI("nncp-bundle", sds, "removed") + ctx.LogI("nncp-bundle", les, "removed") if !*dryRun { os.Remove(dstPath) // #nosec G104 } } else { - ctx.LogE("nncp-bundle", sds, errors.New("bad checksum"), "") + ctx.LogE("nncp-bundle", les, errors.New("bad checksum"), "") } continue } if *pktEnc.Recipient != *ctx.SelfId { - ctx.LogD("nncp-bundle", sds, "Unknown recipient") + ctx.LogD("nncp-bundle", les, "Unknown recipient") continue } if len(nodeIds) > 0 { if _, exists := nodeIds[*pktEnc.Sender]; !exists { - ctx.LogD("nncp-bundle", sds, "Sender is not requested") + ctx.LogD("nncp-bundle", les, "Sender is not requested") continue } } sender := nncp.Base32Codec.EncodeToString(pktEnc.Sender[:]) - sds["node"] = sender - sds["pkt"] = pktName - sds["fullsize"] = entry.Size + les = nncp.LEs{ + {K: "XX", V: string(nncp.TRx)}, + {K: "Node", V: sender}, + {K: "Pkt", V: pktName}, + {K: "FullSize", V: entry.Size}, + } dstDirPath := filepath.Join(ctx.Spool, sender, string(nncp.TRx)) dstPath := filepath.Join(dstDirPath, pktName) if _, err = os.Stat(dstPath); err == nil || !os.IsNotExist(err) { - ctx.LogD("nncp-bundle", sds, "Packet already exists") + ctx.LogD("nncp-bundle", les, "Packet already exists") continue } if _, err = os.Stat(dstPath + nncp.SeenSuffix); err == nil || !os.IsNotExist(err) { - ctx.LogD("nncp-bundle", sds, "Packet already exists") + ctx.LogD("nncp-bundle", les, "Packet already exists") continue } if *doCheck { @@ -336,11 +341,11 @@ func main() { if _, err = hsh.Write(pktEncBuf); err != nil { log.Fatalln("Error during writing:", err) } - if _, err = nncp.CopyProgressed(hsh, tarR, "check", sds, ctx.ShowPrgrs); err != nil { + if _, err = nncp.CopyProgressed(hsh, tarR, "check", les, ctx.ShowPrgrs); err != nil { log.Fatalln("Error during copying:", err) } if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) != pktName { - ctx.LogE("nncp-bundle", sds, errors.New("bad checksum"), "") + ctx.LogE("nncp-bundle", les, errors.New("bad checksum"), "") continue } } else { @@ -351,7 +356,7 @@ func main() { if _, err = tmp.W.Write(pktEncBuf); err != nil { log.Fatalln("Error during writing:", err) } - if _, err = nncp.CopyProgressed(tmp.W, tarR, "check", sds, ctx.ShowPrgrs); err != nil { + if _, err = nncp.CopyProgressed(tmp.W, tarR, "check", les, ctx.ShowPrgrs); err != nil { log.Fatalln("Error during copying:", err) } if err = tmp.W.Flush(); err != nil { @@ -362,14 +367,14 @@ func main() { log.Fatalln("Error during commiting:", err) } } else { - ctx.LogE("nncp-bundle", sds, errors.New("bad checksum"), "") + ctx.LogE("nncp-bundle", les, errors.New("bad checksum"), "") tmp.Cancel() continue } } } else { if *dryRun { - if _, err = nncp.CopyProgressed(ioutil.Discard, tarR, "Rx", sds, ctx.ShowPrgrs); err != nil { + if _, err = nncp.CopyProgressed(ioutil.Discard, tarR, "Rx", les, ctx.ShowPrgrs); err != nil { log.Fatalln("Error during copying:", err) } } else { @@ -381,7 +386,7 @@ func main() { if _, err = bufTmp.Write(pktEncBuf); err != nil { log.Fatalln("Error during writing:", err) } - if _, err = nncp.CopyProgressed(bufTmp, tarR, "Rx", sds, ctx.ShowPrgrs); err != nil { + if _, err = nncp.CopyProgressed(bufTmp, tarR, "Rx", les, ctx.ShowPrgrs); err != nil { log.Fatalln("Error during copying:", err) } if err = bufTmp.Flush(); err != nil { @@ -404,9 +409,13 @@ func main() { } } } - ctx.LogI("nncp-bundle", nncp.SdsAdd(sds, nncp.SDS{ - "size": sds["fullsize"], - }), "") + for _, le := range les { + if le.K == "FullSize" { + les = append(les, nncp.LE{K: "Size", V: le.V}) + break + } + } + ctx.LogI("nncp-bundle", les, "") } } } diff --git a/src/cmd/nncp-caller/main.go b/src/cmd/nncp-caller/main.go index f7ab13c..b702b49 100644 --- a/src/cmd/nncp-caller/main.go +++ b/src/cmd/nncp-caller/main.go @@ -86,7 +86,7 @@ func main() { log.Fatalln("Invalid NODE specified:", err) } if len(node.Calls) == 0 { - ctx.LogD("caller", nncp.SDS{"node": node.Id}, "has no calls, skipping") + ctx.LogD("caller", nncp.LEs{{K: "Node", V: node.Id}}, "has no calls, skipping") continue } nodes = append(nodes, node) @@ -94,7 +94,7 @@ func main() { } else { for _, node := range ctx.Neigh { if len(node.Calls) == 0 { - ctx.LogD("caller", nncp.SDS{"node": node.Id}, "has no calls, skipping") + ctx.LogD("caller", nncp.LEs{{K: "Node", V: node.Id}}, "has no calls, skipping") continue } nodes = append(nodes, node) @@ -115,27 +115,27 @@ func main() { } else { addrs = append(addrs, *call.Addr) } - sds := nncp.SDS{"node": node.Id, "callindex": i} + les := nncp.LEs{{K: "Node", V: node.Id}, {K: "CallIndex", V: i}} for { n := time.Now() t := call.Cron.Next(n) - ctx.LogD("caller", sds, t.String()) + ctx.LogD("caller", les, t.String()) if t.IsZero() { - ctx.LogE("caller", sds, errors.New("got zero time"), "") + ctx.LogE("caller", les, errors.New("got zero time"), "") return } time.Sleep(t.Sub(n)) node.Lock() if node.Busy { node.Unlock() - ctx.LogD("caller", sds, "busy") + ctx.LogD("caller", les, "busy") continue } else { node.Busy = true node.Unlock() if call.WhenTxExists && call.Xx != "TRx" { - ctx.LogD("caller", sds, "checking tx existence") + ctx.LogD("caller", les, "checking tx existence") txExists := false for job := range ctx.Jobs(node.Id, nncp.TTx) { job.Fd.Close() @@ -145,7 +145,7 @@ func main() { txExists = true } if !txExists { - ctx.LogD("caller", sds, "no tx") + ctx.LogD("caller", les, "no tx") node.Lock() node.Busy = false node.Unlock() diff --git a/src/cmd/nncp-daemon/main.go b/src/cmd/nncp-daemon/main.go index 7c1ddaa..6319deb 100644 --- a/src/cmd/nncp-daemon/main.go +++ b/src/cmd/nncp-daemon/main.go @@ -77,16 +77,16 @@ func performSP( Nice: nice, } if err := state.StartR(conn); err == nil { - ctx.LogI("call-start", nncp.SDS{"node": state.Node.Id}, "connected") + ctx.LogI("call-start", nncp.LEs{{K: "Node", V: state.Node.Id}}, "connected") nodeIdC <- state.Node.Id state.Wait() - ctx.LogI("call-finish", nncp.SDS{ - "node": state.Node.Id, - "duration": int64(state.Duration.Seconds()), - "rxbytes": state.RxBytes, - "txbytes": state.TxBytes, - "rxspeed": state.RxSpeed, - "txspeed": state.TxSpeed, + ctx.LogI("call-finish", nncp.LEs{ + {K: "Node", V: state.Node.Id}, + {K: "Duration", V: int64(state.Duration.Seconds())}, + {K: "RxBytes", V: state.RxBytes}, + {K: "TxBytes", V: state.TxBytes}, + {K: "RxSpeed", V: state.RxSpeed}, + {K: "TxSpeed", V: state.TxSpeed}, }, "") } else { nodeId := "unknown" @@ -96,7 +96,7 @@ func performSP( nodeIdC <- state.Node.Id nodeId = state.Node.Id.String() } - ctx.LogE("call-start", nncp.SDS{"node": nodeId}, err, "") + ctx.LogI("call-start", nncp.LEs{{K: "Node", V: nodeId}}, "connected") } close(nodeIdC) } @@ -194,7 +194,7 @@ func main() { if err != nil { log.Fatalln("Can not accept connection:", err) } - ctx.LogD("daemon", nncp.SDS{"addr": conn.RemoteAddr()}, "accepted") + ctx.LogD("daemon", nncp.LEs{{K: "Addr", V: conn.RemoteAddr()}}, "accepted") go func(conn net.Conn) { nodeIdC := make(chan *nncp.NodeId) go performSP(ctx, conn, nice, nodeIdC) diff --git a/src/cmd/nncp-log/main.go b/src/cmd/nncp-log/main.go index 7406261..be13856 100644 --- a/src/cmd/nncp-log/main.go +++ b/src/cmd/nncp-log/main.go @@ -19,13 +19,14 @@ along with this program. If not, see . package main import ( - "bufio" "flag" "fmt" + "io" "log" "os" "go.cypherpunks.ru/nncp/v5" + "go.cypherpunks.ru/recfile" ) func usage() { @@ -63,15 +64,22 @@ func main() { if err != nil { log.Fatalln("Can not open log:", err) } - scanner := bufio.NewScanner(fd) - for scanner.Scan() { - t := scanner.Text() + r := recfile.NewReader(fd) + for { + le, err := r.NextMap() + if err != nil { + if err == io.EOF { + break + } + log.Fatalln("Can not read log:", err) + } if *debug { - fmt.Println(t) + fmt.Println(le) } - fmt.Println(ctx.Humanize(t)) - } - if err = scanner.Err(); err != nil { - log.Fatalln("Can not read log:", err) + s, err := ctx.Humanize(le) + if err != nil { + s = fmt.Sprintf("Can not humanize: %s\n%s", err, le) + } + fmt.Println(s) } } diff --git a/src/cmd/nncp-reass/main.go b/src/cmd/nncp-reass/main.go index 8278700..40253db 100644 --- a/src/cmd/nncp-reass/main.go +++ b/src/cmd/nncp-reass/main.go @@ -57,19 +57,20 @@ func process(ctx *nncp.Ctx, path string, keep, dryRun, stdout, dumpMeta bool) bo log.Fatalln("Can not open file:", err) } var metaPkt nncp.ChunkedMeta + les := nncp.LEs{{K: "Path", V: path}} if _, err = xdr.Unmarshal(fd, &metaPkt); err != nil { - ctx.LogE("nncp-reass", nncp.SDS{"path": path}, err, "bad meta file") + ctx.LogE("nncp-reass", les, err, "bad meta file") return false } fd.Close() // #nosec G104 if metaPkt.Magic != nncp.MagicNNCPMv1 { - ctx.LogE("nncp-reass", nncp.SDS{"path": path}, nncp.BadMagic, "") + ctx.LogE("nncp-reass", les, nncp.BadMagic, "") return false } metaName := filepath.Base(path) if !strings.HasSuffix(metaName, nncp.ChunkedSuffixMeta) { - ctx.LogE("nncp-reass", nncp.SDS{"path": path}, errors.New("invalid filename suffix"), "") + ctx.LogE("nncp-reass", les, errors.New("invalid filename suffix"), "") return false } mainName := strings.TrimSuffix(metaName, nncp.ChunkedSuffixMeta) @@ -105,8 +106,9 @@ func process(ctx *nncp.Ctx, path string, keep, dryRun, stdout, dumpMeta bool) bo allChunksExist := true for chunkNum, chunkPath := range chunksPaths { fi, err := os.Stat(chunkPath) + lesChunk := append(les, nncp.LE{K: "Chunk", V: chunkNum}) if err != nil && os.IsNotExist(err) { - ctx.LogI("nncp-reass", nncp.SDS{"path": path, "chunk": chunkNum}, "missing") + ctx.LogI("nncp-reass", lesChunk, "missing") allChunksExist = false continue } @@ -117,11 +119,7 @@ func process(ctx *nncp.Ctx, path string, keep, dryRun, stdout, dumpMeta bool) bo badSize = uint64(fi.Size()) != metaPkt.ChunkSize } if badSize { - ctx.LogE( - "nncp-reass", - nncp.SDS{"path": path, "chunk": chunkNum}, - errors.New("invalid size"), "", - ) + ctx.LogE("nncp-reass", lesChunk, errors.New("invalid size"), "") allChunksExist = false } } @@ -146,9 +144,9 @@ func process(ctx *nncp.Ctx, path string, keep, dryRun, stdout, dumpMeta bool) bo } if _, err = nncp.CopyProgressed( hsh, bufio.NewReader(fd), "check", - nncp.SDS{ - "pkt": chunkPath, - "fullsize": fi.Size(), + nncp.LEs{ + {K: "Pkt", V: chunkPath}, + {K: "FullSize", V: fi.Size()}, }, ctx.ShowPrgrs, ); err != nil { @@ -158,7 +156,7 @@ func process(ctx *nncp.Ctx, path string, keep, dryRun, stdout, dumpMeta bool) bo if bytes.Compare(hsh.Sum(nil), metaPkt.Checksums[chunkNum][:]) != 0 { ctx.LogE( "nncp-reass", - nncp.SDS{"path": path, "chunk": chunkNum}, + nncp.LEs{{K: "Path", V: path}, {K: "Chunk", V: chunkNum}}, errors.New("checksum is bad"), "", ) allChecksumsGood = false @@ -168,23 +166,22 @@ func process(ctx *nncp.Ctx, path string, keep, dryRun, stdout, dumpMeta bool) bo return false } if dryRun { - ctx.LogI("nncp-reass", nncp.SDS{"path": path}, "ready") + ctx.LogI("nncp-reass", nncp.LEs{{K: "path", V: path}}, "ready") return true } var dst io.Writer var tmp *os.File - var sds nncp.SDS if stdout { dst = os.Stdout - sds = nncp.SDS{"path": path} + les = nncp.LEs{{K: "path", V: path}} } else { tmp, err = nncp.TempFile(mainDir, "reass") if err != nil { log.Fatalln(err) } - sds = nncp.SDS{"path": path, "tmp": tmp.Name()} - ctx.LogD("nncp-reass", sds, "created") + les = nncp.LEs{{K: "path", V: path}, {K: "Tmp", V: tmp.Name()}} + ctx.LogD("nncp-reass", les, "created") dst = tmp } dstW := bufio.NewWriter(dst) @@ -201,9 +198,9 @@ func process(ctx *nncp.Ctx, path string, keep, dryRun, stdout, dumpMeta bool) bo } if _, err = nncp.CopyProgressed( dstW, bufio.NewReader(fd), "reass", - nncp.SDS{ - "pkt": chunkPath, - "fullsize": fi.Size(), + nncp.LEs{ + {K: "Pkt", V: chunkPath}, + {K: "FullSize", V: fi.Size()}, }, ctx.ShowPrgrs, ); err != nil { @@ -212,7 +209,7 @@ func process(ctx *nncp.Ctx, path string, keep, dryRun, stdout, dumpMeta bool) bo fd.Close() // #nosec G104 if !keep { if err = os.Remove(chunkPath); err != nil { - ctx.LogE("nncp-reass", nncp.SdsAdd(sds, nncp.SDS{"chunk": chunkNum}), err, "") + ctx.LogE("nncp-reass", append(les, nncp.LE{K: "Chunk", V: chunkNum}), err, "") hasErrors = true } } @@ -228,15 +225,15 @@ func process(ctx *nncp.Ctx, path string, keep, dryRun, stdout, dumpMeta bool) bo log.Fatalln("Can not close:", err) } } - ctx.LogD("nncp-reass", sds, "written") + ctx.LogD("nncp-reass", les, "written") if !keep { if err = os.Remove(path); err != nil { - ctx.LogE("nncp-reass", sds, err, "") + ctx.LogE("nncp-reass", les, err, "") hasErrors = true } } if stdout { - ctx.LogI("nncp-reass", nncp.SDS{"path": path}, "done") + ctx.LogI("nncp-reass", nncp.LEs{{K: "Path", V: path}}, "done") return !hasErrors } @@ -259,7 +256,7 @@ func process(ctx *nncp.Ctx, path string, keep, dryRun, stdout, dumpMeta bool) bo if err = nncp.DirSync(mainDir); err != nil { log.Fatalln(err) } - ctx.LogI("nncp-reass", nncp.SDS{"path": path}, "done") + ctx.LogI("nncp-reass", nncp.LEs{{K: "Path", V: path}}, "done") return !hasErrors } @@ -267,13 +264,13 @@ func findMetas(ctx *nncp.Ctx, dirPath string) []string { dir, err := os.Open(dirPath) defer dir.Close() if err != nil { - ctx.LogE("nncp-reass", nncp.SDS{"path": dirPath}, err, "") + ctx.LogE("nncp-reass", nncp.LEs{{K: "Path", V: dirPath}}, err, "") return nil } fis, err := dir.Readdir(0) dir.Close() // #nosec G104 if err != nil { - ctx.LogE("nncp-reass", nncp.SDS{"path": dirPath}, err, "") + ctx.LogE("nncp-reass", nncp.LEs{{K: "Path", V: dirPath}}, err, "") return nil } metaPaths := make([]string, 0) diff --git a/src/cmd/nncp-rm/main.go b/src/cmd/nncp-rm/main.go index 7afb24b..d5609e6 100644 --- a/src/cmd/nncp-rm/main.go +++ b/src/cmd/nncp-rm/main.go @@ -118,10 +118,10 @@ func main() { return nil } if now.Sub(info.ModTime()) < oldBoundary { - ctx.LogD("nncp-rm", nncp.SDS{"file": path}, "too fresh, skipping") + ctx.LogD("nncp-rm", nncp.LEs{{K: "File", V: path}}, "too fresh, skipping") return nil } - ctx.LogI("nncp-rm", nncp.SDS{"file": path}, "") + ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "") if *dryRun { return nil } @@ -141,7 +141,7 @@ func main() { return nil } if strings.HasSuffix(info.Name(), ".lock") { - ctx.LogI("nncp-rm", nncp.SDS{"file": path}, "") + ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "") if *dryRun { return nil } @@ -173,25 +173,25 @@ func main() { return nil } if now.Sub(info.ModTime()) < oldBoundary { - ctx.LogD("nncp-rm", nncp.SDS{"file": path}, "too fresh, skipping") + ctx.LogD("nncp-rm", nncp.LEs{{K: "File", V: path}}, "too fresh, skipping") return nil } if *doSeen && strings.HasSuffix(info.Name(), nncp.SeenSuffix) { - ctx.LogI("nncp-rm", nncp.SDS{"file": path}, "") + ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "") if *dryRun { return nil } return os.Remove(path) } if *doPart && strings.HasSuffix(info.Name(), nncp.PartSuffix) { - ctx.LogI("nncp-rm", nncp.SDS{"file": path}, "") + ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "") if *dryRun { return nil } return os.Remove(path) } if *pktRaw != "" && filepath.Base(info.Name()) == *pktRaw { - ctx.LogI("nncp-rm", nncp.SDS{"file": path}, "") + ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "") if *dryRun { return nil } @@ -201,7 +201,7 @@ func main() { !*doPart && (*doRx || *doTx) && ((*doRx && xx == nncp.TRx) || (*doTx && xx == nncp.TTx)) { - ctx.LogI("nncp-rm", nncp.SDS{"file": path}, "") + ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "") if *dryRun { return nil } diff --git a/src/cmd/nncp-xfer/main.go b/src/cmd/nncp-xfer/main.go index 50e9bf4..06db800 100644 --- a/src/cmd/nncp-xfer/main.go +++ b/src/cmd/nncp-xfer/main.go @@ -105,32 +105,34 @@ func main() { isBad := false var dir *os.File var fis []os.FileInfo - sds := nncp.SDS{} + var les nncp.LEs if *txOnly { goto Tx } - sds["xx"] = string(nncp.TRx) - sds["dir"] = selfPath - ctx.LogD("nncp-xfer", sds, "self") + les = nncp.LEs{ + {K: "XX", V: string(nncp.TRx)}, + {K: "Dir", V: selfPath}, + } + ctx.LogD("nncp-xfer", les, "self") if _, err = os.Stat(selfPath); err != nil { if os.IsNotExist(err) { - ctx.LogD("nncp-xfer", sds, "no dir") + ctx.LogD("nncp-xfer", les, "no dir") goto Tx } - ctx.LogE("nncp-xfer", sds, err, "stat") + ctx.LogE("nncp-xfer", les, err, "stat") isBad = true goto Tx } dir, err = os.Open(selfPath) if err != nil { - ctx.LogE("nncp-xfer", sds, err, "open") + ctx.LogE("nncp-xfer", les, err, "open") isBad = true goto Tx } fis, err = dir.Readdir(0) dir.Close() // #nosec G104 if err != nil { - ctx.LogE("nncp-xfer", sds, err, "read") + ctx.LogE("nncp-xfer", les, err, "read") isBad = true goto Tx } @@ -139,29 +141,29 @@ func main() { continue } nodeId, err := nncp.NodeIdFromString(fi.Name()) - sds["node"] = fi.Name() + les := append(les, nncp.LE{K: "Node", V: fi.Name()}) if err != nil { - ctx.LogD("nncp-xfer", sds, "is not NodeId") + ctx.LogD("nncp-xfer", les, "is not NodeId") continue } if nodeOnly != nil && *nodeId != *nodeOnly.Id { - ctx.LogD("nncp-xfer", sds, "skip") + ctx.LogD("nncp-xfer", les, "skip") continue } if _, known := ctx.Neigh[*nodeId]; !known { - ctx.LogD("nncp-xfer", sds, "unknown") + ctx.LogD("nncp-xfer", les, "unknown") continue } dir, err = os.Open(filepath.Join(selfPath, fi.Name())) if err != nil { - ctx.LogE("nncp-xfer", sds, err, "open") + ctx.LogE("nncp-xfer", les, err, "open") isBad = true continue } fisInt, err := dir.Readdir(0) dir.Close() // #nosec G104 if err != nil { - ctx.LogE("nncp-xfer", sds, err, "read") + ctx.LogE("nncp-xfer", les, err, "read") isBad = true continue } @@ -174,29 +176,28 @@ func main() { continue } filename := filepath.Join(dir.Name(), fiInt.Name()) - sds["file"] = filename - delete(sds, "size") + les := append(les, nncp.LE{K: "File", V: filename}) fd, err := os.Open(filename) if err != nil { - ctx.LogE("nncp-xfer", sds, err, "open") + ctx.LogE("nncp-xfer", les, err, "open") isBad = true continue } var pktEnc nncp.PktEnc _, err = xdr.Unmarshal(fd, &pktEnc) if err != nil || pktEnc.Magic != nncp.MagicNNCPEv4 { - ctx.LogD("nncp-xfer", sds, "is not a packet") + ctx.LogD("nncp-xfer", les, "is not a packet") fd.Close() // #nosec G104 continue } if pktEnc.Nice > nice { - ctx.LogD("nncp-xfer", sds, "too nice") + ctx.LogD("nncp-xfer", les, "too nice") fd.Close() // #nosec G104 continue } - sds["size"] = fiInt.Size() + les = append(les, nncp.LE{K: "Size", V: fiInt.Size()}) if !ctx.IsEnoughSpace(fiInt.Size()) { - ctx.LogE("nncp-xfer", sds, errors.New("is not enough space"), "") + ctx.LogE("nncp-xfer", les, errors.New("is not enough space"), "") fd.Close() // #nosec G104 continue } @@ -214,19 +215,19 @@ func main() { err = w.Close() } if err != nil { - ctx.LogE("nncp-xfer", sds, err, "copy") + ctx.LogE("nncp-xfer", les, err, "copy") w.CloseWithError(err) // #nosec G104 } }() if _, err = nncp.CopyProgressed( tmp.W, r, "Rx", - nncp.SdsAdd(sds, nncp.SDS{ - "pkt": filename, - "fullsize": sds["size"], - }), + append(les, nncp.LEs{ + {K: "Pkt", V: filename}, + {K: "FullSize", V: fiInt.Size()}, + }...), ctx.ShowPrgrs, ); err != nil { - ctx.LogE("nncp-xfer", sds, err, "copy") + ctx.LogE("nncp-xfer", les, err, "copy") isBad = true } fd.Close() // #nosec G104 @@ -241,10 +242,10 @@ func main() { )); err != nil { log.Fatalln(err) } - ctx.LogI("nncp-xfer", sds, "") + ctx.LogI("nncp-xfer", les, "") if !*keep { if err = os.Remove(filename); err != nil { - ctx.LogE("nncp-xfer", sds, err, "remove") + ctx.LogE("nncp-xfer", les, err, "remove") isBad = true } } @@ -258,11 +259,13 @@ Tx: } return } - sds["xx"] = string(nncp.TTx) - for nodeId, _ := range ctx.Neigh { - sds["node"] = nodeId + for nodeId := range ctx.Neigh { + les := nncp.LEs{ + {K: "XX", V: string(nncp.TTx)}, + {K: "Node", V: nodeId}, + } if nodeOnly != nil && nodeId != *nodeOnly.Id { - ctx.LogD("nncp-xfer", sds, "skip") + ctx.LogD("nncp-xfer", les, "skip") continue } dirLock, err := ctx.LockDir(&nodeId, string(nncp.TTx)) @@ -270,118 +273,118 @@ Tx: continue } nodePath := filepath.Join(flag.Arg(0), nodeId.String()) - sds["dir"] = nodePath + les = append(les, nncp.LE{K: "Dir", V: nodePath}) _, err = os.Stat(nodePath) if err != nil { if os.IsNotExist(err) { - ctx.LogD("nncp-xfer", sds, "does not exist") + ctx.LogD("nncp-xfer", les, "does not exist") if !*mkdir { ctx.UnlockDir(dirLock) continue } if err = os.Mkdir(nodePath, os.FileMode(0777)); err != nil { ctx.UnlockDir(dirLock) - ctx.LogE("nncp-xfer", sds, err, "mkdir") + ctx.LogE("nncp-xfer", les, err, "mkdir") isBad = true continue } } else { ctx.UnlockDir(dirLock) - ctx.LogE("nncp-xfer", sds, err, "stat") + ctx.LogE("nncp-xfer", les, err, "stat") isBad = true continue } } dstPath := filepath.Join(nodePath, ctx.SelfId.String()) - sds["dir"] = dstPath + les[len(les)-1].V = dstPath _, err = os.Stat(dstPath) if err != nil { if os.IsNotExist(err) { if err = os.Mkdir(dstPath, os.FileMode(0777)); err != nil { ctx.UnlockDir(dirLock) - ctx.LogE("nncp-xfer", sds, err, "mkdir") + ctx.LogE("nncp-xfer", les, err, "mkdir") isBad = true continue } } else { ctx.UnlockDir(dirLock) - ctx.LogE("nncp-xfer", sds, err, "stat") + ctx.LogE("nncp-xfer", les, err, "stat") isBad = true continue } } - delete(sds, "dir") + les = les[:len(les)-1] for job := range ctx.Jobs(&nodeId, nncp.TTx) { pktName := filepath.Base(job.Fd.Name()) - sds["pkt"] = pktName + les := append(les, nncp.LE{K: "Pkt", V: pktName}) if job.PktEnc.Nice > nice { - ctx.LogD("nncp-xfer", sds, "too nice") + ctx.LogD("nncp-xfer", les, "too nice") job.Fd.Close() // #nosec G104 continue } if _, err = os.Stat(filepath.Join(dstPath, pktName)); err == nil || !os.IsNotExist(err) { - ctx.LogD("nncp-xfer", sds, "already exists") + ctx.LogD("nncp-xfer", les, "already exists") job.Fd.Close() // #nosec G104 continue } if _, err = os.Stat(filepath.Join(dstPath, pktName+nncp.SeenSuffix)); err == nil || !os.IsNotExist(err) { - ctx.LogD("nncp-xfer", sds, "already exists") + ctx.LogD("nncp-xfer", les, "already exists") job.Fd.Close() // #nosec G104 continue } tmp, err := nncp.TempFile(dstPath, "xfer") if err != nil { - ctx.LogE("nncp-xfer", sds, err, "mktemp") + ctx.LogE("nncp-xfer", les, err, "mktemp") job.Fd.Close() // #nosec G104 isBad = true break } - sds["tmp"] = tmp.Name() - ctx.LogD("nncp-xfer", sds, "created") + les = append(les, nncp.LE{K: "Tmp", V: tmp.Name()}) + ctx.LogD("nncp-xfer", les, "created") bufW := bufio.NewWriter(tmp) copied, err := nncp.CopyProgressed( bufW, bufio.NewReader(job.Fd), "Tx", - nncp.SdsAdd(sds, nncp.SDS{"fullsize": job.Size}), + append(les, nncp.LE{K: "FullSize", V: job.Size}), ctx.ShowPrgrs, ) job.Fd.Close() // #nosec G104 if err != nil { - ctx.LogE("nncp-xfer", sds, err, "copy") + ctx.LogE("nncp-xfer", les, err, "copy") tmp.Close() // #nosec G104 isBad = true continue } if err = bufW.Flush(); err != nil { tmp.Close() // #nosec G104 - ctx.LogE("nncp-xfer", sds, err, "flush") + ctx.LogE("nncp-xfer", les, err, "flush") isBad = true continue } if err = tmp.Sync(); err != nil { tmp.Close() // #nosec G104 - ctx.LogE("nncp-xfer", sds, err, "sync") + ctx.LogE("nncp-xfer", les, err, "sync") isBad = true continue } if err = tmp.Close(); err != nil { - ctx.LogE("nncp-xfer", sds, err, "sync") + ctx.LogE("nncp-xfer", les, err, "sync") } if err = os.Rename(tmp.Name(), filepath.Join(dstPath, pktName)); err != nil { - ctx.LogE("nncp-xfer", sds, err, "rename") + ctx.LogE("nncp-xfer", les, err, "rename") isBad = true continue } if err = nncp.DirSync(dstPath); err != nil { - ctx.LogE("nncp-xfer", sds, err, "sync") + ctx.LogE("nncp-xfer", les, err, "sync") isBad = true continue } os.Remove(filepath.Join(dstPath, pktName+".part")) // #nosec G104 - delete(sds, "tmp") - ctx.LogI("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"size": copied}), "") + les = les[:len(les)-1] + ctx.LogI("nncp-xfer", append(les, nncp.LE{K: "Size", V: copied}), "") if !*keep { if err = os.Remove(job.Fd.Name()); err != nil { - ctx.LogE("nncp-xfer", sds, err, "remove") + ctx.LogE("nncp-xfer", les, err, "remove") isBad = true } } diff --git a/src/ctx.go b/src/ctx.go index 645483d..ebd3102 100644 --- a/src/ctx.go +++ b/src/ctx.go @@ -65,12 +65,12 @@ func (ctx *Ctx) FindNode(id string) (*Node, error) { func (ctx *Ctx) ensureRxDir(nodeId *NodeId) error { dirPath := filepath.Join(ctx.Spool, nodeId.String(), string(TRx)) if err := os.MkdirAll(dirPath, os.FileMode(0777)); err != nil { - ctx.LogE("dir-ensure", SDS{"dir": dirPath}, err, "") + ctx.LogE("dir-ensure", LEs{{"Dir", dirPath}}, err, "") return err } fd, err := os.Open(dirPath) if err != nil { - ctx.LogE("dir-ensure", SDS{"dir": dirPath}, err, "") + ctx.LogE("dir-ensure", LEs{{"Dir", dirPath}}, err, "") return err } return fd.Close() diff --git a/src/go.mod b/src/go.mod index 56c8b2c..2742c74 100644 --- a/src/go.mod +++ b/src/go.mod @@ -9,6 +9,7 @@ require ( github.com/klauspost/compress v1.11.4 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect go.cypherpunks.ru/balloon v1.1.1 + go.cypherpunks.ru/recfile v0.4.3 golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad golang.org/x/net v0.0.0-20201224014010-6772e930b67b golang.org/x/sys v0.0.0-20210105210732-16f7687f5001 diff --git a/src/go.sum b/src/go.sum index 33b95ab..8ea9c6f 100644 --- a/src/go.sum +++ b/src/go.sum @@ -17,6 +17,8 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWb github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= go.cypherpunks.ru/balloon v1.1.1 h1:ypHM1DRf/XuCrp9pDkTHg00CqZX/Np/APb//iHvDJTA= go.cypherpunks.ru/balloon v1.1.1/go.mod h1:k4s4ozrIrhpBjj78Z7LX8ZHxMQ+XE7DZUWl8gP2ojCo= +go.cypherpunks.ru/recfile v0.4.3 h1:ephokihmV//p0ob6gx2FWXvm28/NBDbWTOJPUNahxO8= +go.cypherpunks.ru/recfile v0.4.3/go.mod h1:sR+KajB+vzofL3SFVFwKt3Fke0FaCcN1g3YPNAhU3qI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad h1:DN0cp81fZ3njFcrLCytUHRSUkqBjfTo4Tx9RJTWs0EY= golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= diff --git a/src/humanizer.go b/src/humanizer.go index 78e2fc1..9ea5e99 100644 --- a/src/humanizer.go +++ b/src/humanizer.go @@ -18,100 +18,84 @@ along with this program. If not, see . package nncp import ( + "errors" "fmt" - "regexp" "strconv" "strings" "time" "github.com/dustin/go-humanize" + "go.cypherpunks.ru/recfile" ) -func (ctx *Ctx) Humanize(s string) string { - s = strings.TrimRight(s, "\n") - splitted := strings.SplitN(s, " ", 4) - if len(splitted) != 4 { - return s - } - var level string - if splitted[0] == "E" { - level = "ERROR " - } - when, err := time.Parse(time.RFC3339Nano, splitted[1]) +func (ctx *Ctx) HumanizeRec(rec string) string { + r := recfile.NewReader(strings.NewReader(rec)) + le, err := r.NextMap() if err != nil { - return s + return rec } - who := splitted[2][1:] - closingBracket := strings.LastIndex(splitted[3], "]") - if closingBracket == -1 { - return s - } - rem := strings.Trim(splitted[3][closingBracket+1:], " ") - sds := make(map[string]string) - - re := regexp.MustCompile(`\w+="[^"]+"`) - for _, pair := range re.FindAllString(splitted[3][:closingBracket+1], -1) { - sep := strings.Index(pair, "=") - sds[pair[:sep]] = pair[sep+2 : len(pair)-1] + humanized, err := ctx.Humanize(le) + if err != nil { + return fmt.Sprintf("Can not humanize: %s\n%s", err, rec) } + return humanized +} - nodeS := sds["node"] +func (ctx *Ctx) Humanize(le map[string]string) (string, error) { + nodeS := le["Node"] node, err := ctx.FindNode(nodeS) if err == nil { nodeS = node.Name } var size string - if sizeRaw, exists := sds["size"]; exists { + if sizeRaw, exists := le["Size"]; exists { sp, err := strconv.ParseUint(sizeRaw, 10, 64) if err != nil { - return s + return "", err } size = humanize.IBytes(uint64(sp)) } var msg string - switch who { + switch le["Who"] { case "tx": - switch sds["type"] { + switch le["Type"] { case "file": msg = fmt.Sprintf( "File %s (%s) transfer to %s:%s: %s", - sds["src"], size, nodeS, sds["dst"], rem, + le["Src"], size, nodeS, le["Dst"], le["Msg"], ) case "freq": msg = fmt.Sprintf( "File request from %s:%s to %s: %s", - nodeS, sds["src"], sds["dst"], rem, + nodeS, le["Src"], le["Dst"], le["Msg"], ) case "exec": msg = fmt.Sprintf( "Exec to %s@%s (%s): %s", - nodeS, sds["dst"], size, rem, + nodeS, le["Dst"], size, le["Msg"], ) case "trns": msg = fmt.Sprintf( "Transitional packet to %s (%s) (nice %s): %s", - nodeS, size, sds["nice"], rem, + nodeS, size, le["Nice"], le["Msg"], ) default: - return s + return "", errors.New("unknown \"tx\" type") } - if err, exists := sds["err"]; exists { + if err, exists := le["Err"]; exists { msg += ": " + err } case "rx": - switch sds["type"] { + switch le["Type"] { case "exec": - msg = fmt.Sprintf( - "Got exec from %s to %s (%s)", - nodeS, sds["dst"], size, - ) + msg = fmt.Sprintf("Got exec from %s to %s (%s)", nodeS, le["Dst"], size) case "file": - msg = fmt.Sprintf("Got file %s (%s) from %s", sds["dst"], size, nodeS) + msg = fmt.Sprintf("Got file %s (%s) from %s", le["Dst"], size, nodeS) case "freq": - msg = fmt.Sprintf("Got file request %s to %s", sds["src"], nodeS) + msg = fmt.Sprintf("Got file request %s to %s", le["Src"], nodeS) case "trns": - nodeT := sds["dst"] + nodeT := le["Dst"] node, err := ctx.FindNode(nodeT) if err == nil { nodeT = node.Name @@ -121,24 +105,24 @@ func (ctx *Ctx) Humanize(s string) string { nodeS, nodeT, size, ) default: - return s + return "", errors.New("unknown \"rx\" type") } - if err, exists := sds["err"]; exists { + if err, exists := le["Err"]; exists { msg += ": " + err } case "check": - msg = fmt.Sprintf("Checking: %s/%s/%s", sds["node"], sds["xx"], sds["pkt"]) - if err, exists := sds["err"]; exists { + msg = fmt.Sprintf("Checking: %s/%s/%s", le["Node"], le["XX"], le["Pkt"]) + if err, exists := le["Err"]; exists { msg += fmt.Sprintf(" %s", err) } case "nncp-xfer": - switch sds["xx"] { + switch le["XX"] { case "rx": msg = "Packet transfer, received from" case "tx": msg = "Packet transfer, sent to" default: - return s + return "", errors.New("unknown XX") } if nodeS != "" { msg += " node " + nodeS @@ -146,53 +130,53 @@ func (ctx *Ctx) Humanize(s string) string { if size != "" { msg += fmt.Sprintf(" (%s)", size) } - if err, exists := sds["err"]; exists { + if err, exists := le["Err"]; exists { msg += ": " + err } else { - msg += " " + rem + msg += " " + le["Msg"] } case "nncp-bundle": - switch sds["xx"] { + switch le["XX"] { case "rx": msg = "Bundle transfer, received from" case "tx": msg = "Bundle transfer, sent to" default: - return s + return "", errors.New("unknown XX") } if nodeS != "" { msg += " node " + nodeS } - msg += " " + sds["pkt"] + msg += " " + le["Pkt"] if size != "" { msg += fmt.Sprintf(" (%s)", size) } - if err, exists := sds["err"]; exists { + if err, exists := le["Err"]; exists { msg += ": " + err } case "nncp-rm": - msg += "removing " + sds["file"] + msg += "removing " + le["File"] case "call-start": msg = fmt.Sprintf("Connection to %s", nodeS) - if err, exists := sds["err"]; exists { + if err, exists := le["Err"]; exists { msg += ": " + err } case "call-finish": - rx, err := strconv.ParseUint(sds["rxbytes"], 10, 64) + rx, err := strconv.ParseUint(le["RxBytes"], 10, 64) if err != nil { - return s + return "", err } - rxs, err := strconv.ParseUint(sds["rxspeed"], 10, 64) + rxs, err := strconv.ParseUint(le["RxSpeed"], 10, 64) if err != nil { - return s + return "", err } - tx, err := strconv.ParseUint(sds["txbytes"], 10, 64) + tx, err := strconv.ParseUint(le["TxBytes"], 10, 64) if err != nil { - return s + return "", err } - txs, err := strconv.ParseUint(sds["txspeed"], 10, 64) + txs, err := strconv.ParseUint(le["TxSpeed"], 10, 64) if err != nil { - return s + return "", err } msg = fmt.Sprintf( "Finished call with %s: %s received (%s/sec), %s transferred (%s/sec)", @@ -203,114 +187,117 @@ func (ctx *Ctx) Humanize(s string) string { case "sp-start": if nodeS == "" { msg += "SP" - if peer, exists := sds["peer"]; exists { + if peer, exists := le["Peer"]; exists { msg += fmt.Sprintf(": %s", peer) } } else { - nice, err := NicenessParse(sds["nice"]) + nice, err := NicenessParse(le["Nice"]) if err != nil { - return s + return "", err } msg += fmt.Sprintf("SP with %s (nice %s)", nodeS, NicenessFmt(nice)) } - if len(rem) > 0 { - msg += ": " + rem + if m, exists := le["Msg"]; exists { + msg += ": " + m } - if err, exists := sds["err"]; exists { + if err, exists := le["Err"]; exists { msg += ": " + err } case "sp-info": - nice, err := NicenessParse(sds["nice"]) + nice, err := NicenessParse(le["Nice"]) if err != nil { - return s + return "", err } msg = fmt.Sprintf( "Packet %s (%s) (nice %s)", - sds["pkt"], - size, - NicenessFmt(nice), + le["Pkt"], size, NicenessFmt(nice), ) - offsetParsed, err := strconv.ParseUint(sds["offset"], 10, 64) + offsetParsed, err := strconv.ParseUint(le["Offset"], 10, 64) if err != nil { - return s + return "", err } - sizeParsed, err := strconv.ParseUint(sds["size"], 10, 64) + sizeParsed, err := strconv.ParseUint(le["Size"], 10, 64) if err != nil { - return s + return "", err } msg += fmt.Sprintf(": %d%%", 100*offsetParsed/sizeParsed) - if len(rem) > 0 { - msg += ": " + rem + if m, exists := le["Msg"]; exists { + msg += ": " + m } case "sp-infos": - switch sds["xx"] { + switch le["XX"] { case "rx": msg = fmt.Sprintf("%s has got for us: ", nodeS) case "tx": msg = fmt.Sprintf("We have got for %s: ", nodeS) default: - return s + return "", errors.New("unknown XX") } - msg += fmt.Sprintf("%s packets, %s", sds["pkts"], size) + msg += fmt.Sprintf("%s packets, %s", le["Pkts"], size) case "sp-process": - msg = fmt.Sprintf("%s has %s (%s): %s", nodeS, sds["pkt"], size, rem) + msg = fmt.Sprintf("%s has %s (%s): %s", nodeS, le["Pkt"], size, le["Msg"]) case "sp-file": - switch sds["xx"] { + switch le["XX"] { case "rx": msg = "Got packet " case "tx": msg = "Sent packet " default: - return s + return "", errors.New("unknown XX") } - fullsize, err := strconv.ParseUint(sds["fullsize"], 10, 64) + fullsize, err := strconv.ParseUint(le["FullSize"], 10, 64) if err != nil { - return s + return "", err } - sizeParsed, err := strconv.ParseUint(sds["size"], 10, 64) + sizeParsed, err := strconv.ParseUint(le["Size"], 10, 64) if err != nil { - return s + return "", err } msg += fmt.Sprintf( "%s %d%% (%s / %s)", - sds["pkt"], + le["Pkt"], 100*sizeParsed/fullsize, humanize.IBytes(uint64(sizeParsed)), humanize.IBytes(uint64(fullsize)), ) case "sp-done": - switch sds["xx"] { + switch le["XX"] { case "rx": - msg = fmt.Sprintf("Packet %s is retreived (%s)", sds["pkt"], size) + msg = fmt.Sprintf("Packet %s is retreived (%s)", le["Pkt"], size) case "tx": - msg = fmt.Sprintf("Packet %s is sent", sds["pkt"]) + msg = fmt.Sprintf("Packet %s is sent", le["Pkt"]) default: - return s + return "", errors.New("unknown XX") } case "nncp-reass": - chunkNum, exists := sds["chunk"] + chunkNum, exists := le["Chunk"] if exists { msg = fmt.Sprintf( "Reassembling chunked file \"%s\" (chunk %s): %s", - sds["path"], - chunkNum, - rem, + le["Path"], chunkNum, le["Msg"], ) } else { msg = fmt.Sprintf( "Reassembling chunked file \"%s\": %s", - sds["path"], - rem, + le["Path"], le["Msg"], ) } - if err, exists := sds["err"]; exists { + if err, exists := le["Err"]; exists { msg += ": " + err } case "lockdir": - msg = fmt.Sprintf("Acquire lock for %s: %s", sds["path"], sds["err"]) + msg = fmt.Sprintf("Acquire lock for %s: %s", le["Path"], le["Err"]) default: - return s + return "", errors.New("unknown Who") + } + when, err := time.Parse(time.RFC3339Nano, le["When"]) + if err != nil { + return "", err + } + var level string + if _, isErr := le["Err"]; isErr { + level = "ERROR " } - return fmt.Sprintf("%s %s%s", when.Format(time.RFC3339), level, msg) + return fmt.Sprintf("%s %s%s", when.Format(time.RFC3339), level, msg), nil } diff --git a/src/jobs.go b/src/jobs.go index 2979064..8fe4564 100644 --- a/src/jobs.go +++ b/src/jobs.go @@ -71,12 +71,12 @@ func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job { fd.Close() // #nosec G104 continue } - ctx.LogD("jobs", SDS{ - "xx": string(xx), - "node": pktEnc.Sender, - "name": fi.Name(), - "nice": int(pktEnc.Nice), - "size": fi.Size(), + ctx.LogD("jobs", LEs{ + {"XX", string(xx)}, + {"Node", pktEnc.Sender}, + {"Name", fi.Name()}, + {"Nice", int(pktEnc.Nice)}, + {"Size", fi.Size()}, }, "taken") job := Job{ PktEnc: &pktEnc, diff --git a/src/lockdir.go b/src/lockdir.go index 919d4cf..2e6ac20 100644 --- a/src/lockdir.go +++ b/src/lockdir.go @@ -26,7 +26,7 @@ import ( func (ctx *Ctx) LockDir(nodeId *NodeId, lockCtx string) (*os.File, error) { if err := ctx.ensureRxDir(nodeId); err != nil { - ctx.LogE("lockdir", SDS{}, err, "") + ctx.LogE("lockdir", LEs{}, err, "") return nil, err } lockPath := filepath.Join(ctx.Spool, nodeId.String(), lockCtx) + ".lock" @@ -36,12 +36,12 @@ func (ctx *Ctx) LockDir(nodeId *NodeId, lockCtx string) (*os.File, error) { os.FileMode(0666), ) if err != nil { - ctx.LogE("lockdir", SDS{"path": lockPath}, err, "") + ctx.LogE("lockdir", LEs{{"path", lockPath}}, err, "") return nil, err } err = unix.Flock(int(dirLock.Fd()), unix.LOCK_EX|unix.LOCK_NB) if err != nil { - ctx.LogE("lockdir", SDS{"path": lockPath}, err, "") + ctx.LogE("lockdir", LEs{{"path", lockPath}}, err, "") dirLock.Close() // #nosec G104 return nil, err } diff --git a/src/log.go b/src/log.go index df2b491..5c98bd7 100644 --- a/src/log.go +++ b/src/log.go @@ -18,54 +18,50 @@ along with this program. If not, see . package nncp import ( + "bytes" "fmt" "os" - "sort" - "strings" "time" + "go.cypherpunks.ru/recfile" "golang.org/x/sys/unix" ) -type LogLevel string - -type SDS map[string]interface{} +type LE struct { + K string + V interface{} +} +type LEs []LE -func sdFmt(who string, sds SDS) string { - keys := make([]string, 0, len(sds)) - for k, _ := range sds { - keys = append(keys, k) - } - sort.Strings(keys) - result := make([]string, 0, 1+len(keys)) - result = append(result, "["+who) - for _, k := range keys { - var value string - switch v := sds[k].(type) { +func (les LEs) Rec() string { + fields := make([]recfile.Field, 0, len(les)+1) + fields = append(fields, recfile.Field{ + Name: "When", Value: time.Now().UTC().Format(time.RFC3339Nano), + }) + var val string + for _, le := range les { + switch v := le.V.(type) { case int, int8, uint8, int64, uint64: - value = fmt.Sprintf("%d", v) + val = fmt.Sprintf("%d", v) default: - value = fmt.Sprintf("%s", v) + val = fmt.Sprintf("%s", v) } - result = append(result, fmt.Sprintf(`%s="%s"`, k, value)) + fields = append(fields, recfile.Field{Name: le.K, Value: val}) } - return strings.Join(result, " ") + "]" -} - -func msgFmt(level LogLevel, who string, sds SDS, msg string) string { - result := fmt.Sprintf( - "%s %s %s", - level, - time.Now().UTC().Format(time.RFC3339Nano), - sdFmt(who, sds), - ) - if len(msg) > 0 { - result += " " + msg + b := bytes.NewBuffer(make([]byte, 0, 1<<10)) + w := recfile.NewWriter(b) + _, err := w.RecordStart() + if err != nil { + panic(err) } - return result + "\n" + _, err = w.WriteFields(fields...) + if err != nil { + panic(err) + } + return b.String() } -func (ctx *Ctx) Log(msg string) { +func (ctx *Ctx) Log(rec string) { fdLock, err := os.OpenFile( ctx.LogPath+".lock", os.O_CREATE|os.O_WRONLY, @@ -92,42 +88,41 @@ func (ctx *Ctx) Log(msg string) { fmt.Fprintln(os.Stderr, "Can not open log:", err) return } - fd.WriteString(msg) // #nosec G104 + fd.WriteString(rec) // #nosec G104 fd.Close() // #nosec G104 } -func (ctx *Ctx) LogD(who string, sds SDS, msg string) { +func (ctx *Ctx) LogD(who string, les LEs, msg string) { if !ctx.Debug { return } - fmt.Fprint(os.Stderr, msgFmt(LogLevel("D"), who, sds, msg)) -} - -func (ctx *Ctx) LogI(who string, sds SDS, msg string) { - msg = msgFmt(LogLevel("I"), who, sds, msg) - if !ctx.Quiet { - fmt.Fprintln(os.Stderr, ctx.Humanize(msg)) + les = append(LEs{{"Debug", true}, {"Who", who}}, les...) + if msg != "" { + les = append(les, LE{"Msg", msg}) } - ctx.Log(msg) + fmt.Fprint(os.Stderr, les.Rec()) } -func (ctx *Ctx) LogE(who string, sds SDS, err error, msg string) { - sds["err"] = err.Error() - msg = msgFmt(LogLevel("E"), who, sds, msg) - if len(msg) > 2048 { - msg = msg[:2048] +func (ctx *Ctx) LogI(who string, les LEs, msg string) { + les = append(LEs{{"Who", who}}, les...) + if msg != "" { + les = append(les, LE{"Msg", msg}) } - fmt.Fprintln(os.Stderr, ctx.Humanize(msg)) - ctx.Log(msg) + rec := les.Rec() + if !ctx.Quiet { + fmt.Fprintln(os.Stderr, ctx.HumanizeRec(rec)) + } + ctx.Log(rec) } -func SdsAdd(sds, add SDS) SDS { - neu := SDS{} - for k, v := range sds { - neu[k] = v +func (ctx *Ctx) LogE(who string, les LEs, err error, msg string) { + les = append(LEs{{"Err", err.Error()}, {"Who", who}}, les...) + if msg != "" { + les = append(les, LE{"Msg", msg}) } - for k, v := range add { - neu[k] = v + rec := les.Rec() + if !ctx.Quiet { + fmt.Fprintln(os.Stderr, ctx.HumanizeRec(rec)) } - return neu + ctx.Log(rec) } diff --git a/src/nncp.go b/src/nncp.go index a775bd4..954dad1 100644 --- a/src/nncp.go +++ b/src/nncp.go @@ -38,7 +38,7 @@ along with this program. If not, see .` ) var ( - Version string = "5.7.0" + Version string = "6.0.0" Base32Codec *base32.Encoding = base32.StdEncoding.WithPadding(base32.NoPadding) ) diff --git a/src/progress.go b/src/progress.go index 61ad465..62e0f01 100644 --- a/src/progress.go +++ b/src/progress.go @@ -82,7 +82,7 @@ func CopyProgressed( dst io.Writer, src io.Reader, prgrsPrefix string, - sds SDS, + les LEs, showPrgrs bool, ) (written int64, err error) { buf := make([]byte, EncBlkSize) @@ -95,8 +95,7 @@ func CopyProgressed( if nw > 0 { written += int64(nw) if showPrgrs { - sds["size"] = written - Progress(prgrsPrefix, sds) + Progress(prgrsPrefix, append(les, LE{"Size", written})) } } if ew != nil { @@ -118,13 +117,20 @@ func CopyProgressed( return } -func Progress(prefix string, sds SDS) { +func Progress(prefix string, les LEs) { var size int64 - if sizeI, exists := sds["size"]; exists { - size = sizeI.(int64) + var fullsize int64 + var pkt string + for _, le := range les { + switch le.K { + case "Size": + size = le.V.(int64) + case "FullSize": + fullsize = le.V.(int64) + case "Pkt": + pkt = le.V.(string) + } } - fullsize := sds["fullsize"].(int64) - pkt := sds["pkt"].(string) progressBarsLock.RLock() pb, exists := progressBars[pkt] progressBarsLock.RUnlock() diff --git a/src/sp.go b/src/sp.go index 72ae7fb..3c6d2d4 100644 --- a/src/sp.go +++ b/src/sp.go @@ -311,18 +311,18 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [ var payloads [][]byte for _, info := range infos { payloads = append(payloads, MarshalSP(SPTypeInfo, info)) - ctx.LogD("sp-info-our", SDS{ - "node": nodeId, - "name": Base32Codec.EncodeToString(info.Hash[:]), - "size": info.Size, + ctx.LogD("sp-info-our", LEs{ + {"Node", nodeId}, + {"Name", Base32Codec.EncodeToString(info.Hash[:])}, + {"Size", info.Size}, }, "") } if totalSize > 0 { - ctx.LogI("sp-infos", SDS{ - "xx": string(TTx), - "node": nodeId, - "pkts": len(payloads), - "size": totalSize, + ctx.LogI("sp-infos", LEs{ + {"XX", string(TTx)}, + {"Node", nodeId}, + {"Pkts", len(payloads)}, + {"Size", totalSize}, }, "") } return payloadsSplit(payloads) @@ -392,31 +392,31 @@ func (state *SPState) StartI(conn ConnDeadlined) error { state.dirUnlock() return err } - sds := SDS{"node": nodeId, "nice": int(state.Nice)} - state.Ctx.LogD("sp-start", sds, "sending first message") + les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}} + state.Ctx.LogD("sp-start", les, "sending first message") conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if err = state.WriteSP(conn, buf, false); err != nil { - state.Ctx.LogE("sp-start", sds, err, "") + state.Ctx.LogE("sp-start", les, err, "") state.dirUnlock() return err } - state.Ctx.LogD("sp-start", sds, "waiting for first message") + state.Ctx.LogD("sp-start", les, "waiting for first message") conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if buf, err = state.ReadSP(conn); err != nil { - state.Ctx.LogE("sp-start", sds, err, "") + state.Ctx.LogE("sp-start", les, err, "") state.dirUnlock() return err } payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf) if err != nil { - state.Ctx.LogE("sp-start", sds, err, "") + state.Ctx.LogE("sp-start", les, err, "") state.dirUnlock() return err } - state.Ctx.LogD("sp-start", sds, "starting workers") + state.Ctx.LogD("sp-start", les, "starting workers") err = state.StartWorkers(conn, infosPayloads, payload) if err != nil { - state.Ctx.LogE("sp-start", sds, err, "") + state.Ctx.LogE("sp-start", les, err, "") state.dirUnlock() } return err @@ -447,14 +447,14 @@ func (state *SPState) StartR(conn ConnDeadlined) error { state.xxOnly = xxOnly var buf []byte var payload []byte - state.Ctx.LogD("sp-start", SDS{"nice": int(state.Nice)}, "waiting for first message") + state.Ctx.LogD("sp-start", LEs{{"Nice", int(state.Nice)}}, "waiting for first message") conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if buf, err = state.ReadSP(conn); err != nil { - state.Ctx.LogE("sp-start", SDS{}, err, "") + state.Ctx.LogE("sp-start", LEs{}, err, "") return err } if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil { - state.Ctx.LogE("sp-start", SDS{}, err, "") + state.Ctx.LogE("sp-start", LEs{}, err, "") return err } @@ -467,7 +467,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error { } if node == nil { peerId := Base32Codec.EncodeToString(state.hs.PeerStatic()) - state.Ctx.LogE("sp-start", SDS{"peer": peerId}, errors.New("unknown"), "") + state.Ctx.LogE("sp-start", LEs{{"Peer", peerId}}, errors.New("unknown peer"), "") return errors.New("Unknown peer: " + peerId) } state.Node = node @@ -475,7 +475,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error { state.txRate = node.TxRate state.onlineDeadline = node.OnlineDeadline state.maxOnlineTime = node.MaxOnlineTime - sds := SDS{"node": node.Id, "nice": int(state.Nice)} + les := LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}} if err = state.Ctx.ensureRxDir(node.Id); err != nil { return err @@ -510,7 +510,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error { firstPayload = append(firstPayload, SPHaltMarshalized...) } - state.Ctx.LogD("sp-start", sds, "sending first message") + state.Ctx.LogD("sp-start", les, "sending first message") buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload) if err != nil { state.dirUnlock() @@ -518,11 +518,11 @@ func (state *SPState) StartR(conn ConnDeadlined) error { } conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if err = state.WriteSP(conn, buf, false); err != nil { - state.Ctx.LogE("sp-start", sds, err, "") + state.Ctx.LogE("sp-start", les, err, "") state.dirUnlock() return err } - state.Ctx.LogD("sp-start", sds, "starting workers") + state.Ctx.LogD("sp-start", les, "starting workers") err = state.StartWorkers(conn, infosPayloads, payload) if err != nil { state.dirUnlock() @@ -535,7 +535,7 @@ func (state *SPState) StartWorkers( infosPayloads [][]byte, payload []byte, ) error { - sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)} + les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}} state.isDead = make(chan struct{}) if state.maxOnlineTime > 0 { state.mustFinishAt = state.started.Add(state.maxOnlineTime) @@ -548,7 +548,7 @@ func (state *SPState) StartWorkers( for _, payload := range infosPayloads[1:] { state.Ctx.LogD( "sp-work", - SdsAdd(sds, SDS{"size": len(payload)}), + append(les, LE{"Size", len(payload)}), "queuing remaining payload", ) state.payloads <- payload @@ -560,12 +560,12 @@ func (state *SPState) StartWorkers( // Processing of first payload and queueing its responses state.Ctx.LogD( "sp-work", - SdsAdd(sds, SDS{"size": len(payload)}), + append(les, LE{"Size", len(payload)}), "processing first payload", ) replies, err := state.ProcessSP(payload) if err != nil { - state.Ctx.LogE("sp-work", sds, err, "") + state.Ctx.LogE("sp-work", les, err, "") return err } state.wg.Add(1) @@ -573,7 +573,7 @@ func (state *SPState) StartWorkers( for _, reply := range replies { state.Ctx.LogD( "sp-work", - SdsAdd(sds, SDS{"size": len(reply)}), + append(les, LE{"Size", len(reply)}), "queuing reply", ) state.payloads <- reply @@ -632,7 +632,7 @@ func (state *SPState) StartWorkers( ) { state.Ctx.LogD( "sp-work", - SdsAdd(sds, SDS{"size": len(payload)}), + append(les, LE{"Size", len(payload)}), "queuing new info", ) state.payloads <- payload @@ -656,13 +656,13 @@ func (state *SPState) StartWorkers( var ping bool select { case <-state.pings: - state.Ctx.LogD("sp-xmit", sds, "got ping") + state.Ctx.LogD("sp-xmit", les, "got ping") payload = SPPingMarshalized ping = true case payload = <-state.payloads: state.Ctx.LogD( "sp-xmit", - SdsAdd(sds, SDS{"size": len(payload)}), + append(les, LE{"Size", len(payload)}), "got payload", ) default: @@ -677,12 +677,12 @@ func (state *SPState) StartWorkers( if state.txRate > 0 { time.Sleep(time.Second / time.Duration(state.txRate)) } - sdsp := SdsAdd(sds, SDS{ - "xx": string(TTx), - "pkt": Base32Codec.EncodeToString(freq.Hash[:]), - "size": int64(freq.Offset), - }) - state.Ctx.LogD("sp-file", sdsp, "queueing") + lesp := append(les, LEs{ + {"XX", string(TTx)}, + {"Pkt", Base32Codec.EncodeToString(freq.Hash[:])}, + {"Size", int64(freq.Offset)}, + }...) + state.Ctx.LogD("sp-file", lesp, "queueing") fd, err := os.Open(filepath.Join( state.Ctx.Spool, state.Node.Id.String(), @@ -690,30 +690,30 @@ func (state *SPState) StartWorkers( Base32Codec.EncodeToString(freq.Hash[:]), )) if err != nil { - state.Ctx.LogE("sp-file", sdsp, err, "") + state.Ctx.LogE("sp-file", lesp, err, "") return } fi, err := fd.Stat() if err != nil { - state.Ctx.LogE("sp-file", sdsp, err, "") + state.Ctx.LogE("sp-file", lesp, err, "") return } fullSize := fi.Size() var buf []byte if freq.Offset < uint64(fullSize) { - state.Ctx.LogD("sp-file", sdsp, "seeking") + state.Ctx.LogD("sp-file", lesp, "seeking") if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil { - state.Ctx.LogE("sp-file", sdsp, err, "") + state.Ctx.LogE("sp-file", lesp, err, "") return } buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead) n, err := fd.Read(buf) if err != nil { - state.Ctx.LogE("sp-file", sdsp, err, "") + state.Ctx.LogE("sp-file", lesp, err, "") return } buf = buf[:n] - state.Ctx.LogD("sp-file", SdsAdd(sdsp, SDS{"size": n}), "read") + state.Ctx.LogD("sp-file", append(lesp, LE{"Size", n}), "read") } fd.Close() // #nosec G104 payload = MarshalSP(SPTypeFile, SPFile{ @@ -722,15 +722,15 @@ func (state *SPState) StartWorkers( Payload: buf, }) ourSize := freq.Offset + uint64(len(buf)) - sdsp["size"] = int64(ourSize) - sdsp["fullsize"] = fullSize + lesp = append(lesp, LE{"Size", int64(ourSize)}) + lesp = append(lesp, LE{"FullSize", fullSize}) if state.Ctx.ShowPrgrs { - Progress("Tx", sdsp) + Progress("Tx", lesp) } state.Lock() if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash { if ourSize == uint64(fullSize) { - state.Ctx.LogD("sp-file", sdsp, "finished") + state.Ctx.LogD("sp-file", lesp, "finished") if len(state.queueTheir) > 1 { state.queueTheir = state.queueTheir[1:] } else { @@ -740,14 +740,14 @@ func (state *SPState) StartWorkers( state.queueTheir[0].freq.Offset += uint64(len(buf)) } } else { - state.Ctx.LogD("sp-file", sdsp, "queue disappeared") + state.Ctx.LogD("sp-file", lesp, "queue disappeared") } state.Unlock() } - state.Ctx.LogD("sp-xmit", SdsAdd(sds, SDS{"size": len(payload)}), "sending") + state.Ctx.LogD("sp-xmit", append(les, LE{"Size", len(payload)}), "sending") conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil { - state.Ctx.LogE("sp-xmit", sds, err, "") + state.Ctx.LogE("sp-xmit", les, err, "") return } } @@ -760,7 +760,7 @@ func (state *SPState) StartWorkers( if state.NotAlive() { break } - state.Ctx.LogD("sp-recv", sds, "waiting for payload") + state.Ctx.LogD("sp-recv", les, "waiting for payload") conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 payload, err := state.ReadSP(conn) if err != nil { @@ -774,27 +774,27 @@ func (state *SPState) StartWorkers( if unmarshalErr.ErrorCode == xdr.ErrIO { break } - state.Ctx.LogE("sp-recv", sds, err, "") + state.Ctx.LogE("sp-recv", les, err, "") break } state.Ctx.LogD( "sp-recv", - SdsAdd(sds, SDS{"size": len(payload)}), + append(les, LE{"Size", len(payload)}), "got payload", ) payload, err = state.csTheir.Decrypt(nil, nil, payload) if err != nil { - state.Ctx.LogE("sp-recv", sds, err, "") + state.Ctx.LogE("sp-recv", les, err, "") break } state.Ctx.LogD( "sp-recv", - SdsAdd(sds, SDS{"size": len(payload)}), + append(les, LE{"Size", len(payload)}), "processing", ) replies, err := state.ProcessSP(payload) if err != nil { - state.Ctx.LogE("sp-recv", sds, err, "") + state.Ctx.LogE("sp-recv", les, err, "") break } state.wg.Add(1) @@ -802,7 +802,7 @@ func (state *SPState) StartWorkers( for _, reply := range replies { state.Ctx.LogD( "sp-recv", - SdsAdd(sds, SDS{"size": len(reply)}), + append(les, LE{"Size", len(reply)}), "queuing reply", ) state.payloads <- reply @@ -841,16 +841,16 @@ func (state *SPState) Wait() { } func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { - sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)} + les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}} r := bytes.NewReader(payload) var err error var replies [][]byte var infosGot bool for r.Len() > 0 { - state.Ctx.LogD("sp-process", sds, "unmarshaling header") + state.Ctx.LogD("sp-process", les, "unmarshaling header") var head SPHead if _, err = xdr.Unmarshal(r, &head); err != nil { - state.Ctx.LogE("sp-process", sds, err, "") + state.Ctx.LogE("sp-process", les, err, "") return nil, err } if head.Type != SPTypePing { @@ -858,38 +858,38 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } switch head.Type { case SPTypeHalt: - state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "halt"}), "") + state.Ctx.LogD("sp-process", append(les, LE{"Type", "halt"}), "") state.Lock() state.queueTheir = nil state.Unlock() case SPTypePing: - state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "ping"}), "") + state.Ctx.LogD("sp-process", append(les, LE{"Type", "ping"}), "") case SPTypeInfo: infosGot = true - sdsp := SdsAdd(sds, SDS{"type": "info"}) - state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet") + lesp := append(les, LE{"Type", "info"}) + state.Ctx.LogD("sp-process", lesp, "unmarshaling packet") var info SPInfo if _, err = xdr.Unmarshal(r, &info); err != nil { - state.Ctx.LogE("sp-process", sdsp, err, "") + state.Ctx.LogE("sp-process", lesp, err, "") return nil, err } - sdsp = SdsAdd(sds, SDS{ - "pkt": Base32Codec.EncodeToString(info.Hash[:]), - "size": int64(info.Size), - "nice": int(info.Nice), - }) + lesp = append(lesp, LEs{ + {"Pkt", Base32Codec.EncodeToString(info.Hash[:])}, + {"Size", int64(info.Size)}, + {"Nice", int(info.Nice)}, + }...) if !state.listOnly && info.Nice > state.Nice { - state.Ctx.LogD("sp-process", sdsp, "too nice") + state.Ctx.LogD("sp-process", lesp, "too nice") continue } - state.Ctx.LogD("sp-process", sdsp, "received") + state.Ctx.LogD("sp-process", lesp, "received") if !state.listOnly && state.xxOnly == TTx { continue } state.Lock() state.infosTheir[*info.Hash] = &info state.Unlock() - state.Ctx.LogD("sp-process", sdsp, "stating part") + state.Ctx.LogD("sp-process", lesp, "stating part") pktPath := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), @@ -897,14 +897,14 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { Base32Codec.EncodeToString(info.Hash[:]), ) if _, err = os.Stat(pktPath); err == nil { - state.Ctx.LogI("sp-info", sdsp, "already done") + state.Ctx.LogI("sp-info", lesp, "already done") if !state.listOnly { replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash})) } continue } if _, err = os.Stat(pktPath + SeenSuffix); err == nil { - state.Ctx.LogI("sp-info", sdsp, "already seen") + state.Ctx.LogI("sp-info", lesp, "already seen") if !state.listOnly { replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash})) } @@ -916,14 +916,10 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { offset = fi.Size() } if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) { - state.Ctx.LogI("sp-info", sdsp, "not enough space") + state.Ctx.LogI("sp-info", lesp, "not enough space") continue } - state.Ctx.LogI( - "sp-info", - SdsAdd(sdsp, SDS{"offset": offset}), - "", - ) + state.Ctx.LogI("sp-info", append(lesp, LE{"Offset", offset}), "") if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) { replies = append(replies, MarshalSP( SPTypeFreq, @@ -931,51 +927,49 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { )) } case SPTypeFile: - sdsp := SdsAdd(sds, SDS{"type": "file"}) - state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet") + lesp := append(les, LE{"Type", "file"}) + state.Ctx.LogD("sp-process", lesp, "unmarshaling packet") var file SPFile if _, err = xdr.Unmarshal(r, &file); err != nil { - state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "file"}), err, "") + state.Ctx.LogE("sp-process", lesp, err, "") return nil, err } - sdsp["xx"] = string(TRx) - sdsp["pkt"] = Base32Codec.EncodeToString(file.Hash[:]) - sdsp["size"] = len(file.Payload) + lesp = append(lesp, LEs{ + {"XX", string(TRx)}, + {"Pkt", Base32Codec.EncodeToString(file.Hash[:])}, + {"Size", len(file.Payload)}, + }...) dirToSync := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TRx), ) filePath := filepath.Join(dirToSync, Base32Codec.EncodeToString(file.Hash[:])) - state.Ctx.LogD("sp-file", sdsp, "opening part") + state.Ctx.LogD("sp-file", lesp, "opening part") fd, err := os.OpenFile( filePath+PartSuffix, os.O_RDWR|os.O_CREATE, os.FileMode(0666), ) if err != nil { - state.Ctx.LogE("sp-file", sdsp, err, "") + state.Ctx.LogE("sp-file", lesp, err, "") return nil, err } - state.Ctx.LogD( - "sp-file", - SdsAdd(sdsp, SDS{"offset": file.Offset}), - "seeking", - ) + state.Ctx.LogD("sp-file", append(lesp, LE{"Offset", file.Offset}), "seeking") if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil { - state.Ctx.LogE("sp-file", sdsp, err, "") + state.Ctx.LogE("sp-file", lesp, err, "") fd.Close() // #nosec G104 return nil, err } - state.Ctx.LogD("sp-file", sdsp, "writing") + state.Ctx.LogD("sp-file", lesp, "writing") _, err = fd.Write(file.Payload) if err != nil { - state.Ctx.LogE("sp-file", sdsp, err, "") + state.Ctx.LogE("sp-file", lesp, err, "") fd.Close() // #nosec G104 return nil, err } ourSize := int64(file.Offset + uint64(len(file.Payload))) - sdsp["size"] = ourSize + lesp[len(lesp)-1].V = ourSize fullsize := int64(0) state.RLock() infoTheir, ok := state.infosTheir[*file.Hash] @@ -983,9 +977,9 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { if ok { fullsize = int64(infoTheir.Size) } - sdsp["fullsize"] = fullsize + lesp = append(lesp, LE{"FullSize", fullsize}) if state.Ctx.ShowPrgrs { - Progress("Rx", sdsp) + Progress("Rx", lesp) } if fullsize != ourSize { fd.Close() // #nosec G104 @@ -997,7 +991,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { spCheckerToken <- struct{}{} }() if err := fd.Sync(); err != nil { - state.Ctx.LogE("sp-file", sdsp, err, "sync") + state.Ctx.LogE("sp-file", lesp, err, "sync") fd.Close() // #nosec G104 return } @@ -1005,23 +999,23 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { defer state.wg.Done() if _, err = fd.Seek(0, io.SeekStart); err != nil { fd.Close() // #nosec G104 - state.Ctx.LogE("sp-file", sdsp, err, "") + state.Ctx.LogE("sp-file", lesp, err, "") return } - state.Ctx.LogD("sp-file", sdsp, "checking") - gut, err := Check(fd, file.Hash[:], sdsp, state.Ctx.ShowPrgrs) + state.Ctx.LogD("sp-file", lesp, "checking") + gut, err := Check(fd, file.Hash[:], lesp, state.Ctx.ShowPrgrs) fd.Close() // #nosec G104 if err != nil || !gut { - state.Ctx.LogE("sp-file", sdsp, errors.New("checksum mismatch"), "") + state.Ctx.LogE("sp-file", lesp, errors.New("checksum mismatch"), "") return } - state.Ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "") + state.Ctx.LogI("sp-done", lesp, "") if err = os.Rename(filePath+PartSuffix, filePath); err != nil { - state.Ctx.LogE("sp-file", sdsp, err, "rename") + state.Ctx.LogE("sp-file", lesp, err, "rename") return } if err = DirSync(dirToSync); err != nil { - state.Ctx.LogE("sp-file", sdsp, err, "sync") + state.Ctx.LogE("sp-file", lesp, err, "sync") return } state.Lock() @@ -1034,38 +1028,38 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { }() }() case SPTypeDone: - sdsp := SdsAdd(sds, SDS{"type": "done"}) - state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet") + lesp := append(les, LE{"Type", "done"}) + state.Ctx.LogD("sp-process", lesp, "unmarshaling packet") var done SPDone if _, err = xdr.Unmarshal(r, &done); err != nil { - state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "done"}), err, "") + state.Ctx.LogE("sp-process", lesp, err, "") return nil, err } - sdsp["pkt"] = Base32Codec.EncodeToString(done.Hash[:]) - state.Ctx.LogD("sp-done", sdsp, "removing") + lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(done.Hash[:])}) + state.Ctx.LogD("sp-done", lesp, "removing") err := os.Remove(filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TTx), Base32Codec.EncodeToString(done.Hash[:]), )) - sdsp["xx"] = string(TTx) + lesp = append(lesp, LE{"XX", string(TTx)}) if err == nil { - state.Ctx.LogI("sp-done", sdsp, "") + state.Ctx.LogI("sp-done", lesp, "") } else { - state.Ctx.LogE("sp-done", sdsp, err, "") + state.Ctx.LogE("sp-done", lesp, err, "") } case SPTypeFreq: - sdsp := SdsAdd(sds, SDS{"type": "freq"}) - state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet") + lesp := append(les, LE{"Type", "freq"}) + state.Ctx.LogD("sp-process", lesp, "unmarshaling packet") var freq SPFreq if _, err = xdr.Unmarshal(r, &freq); err != nil { - state.Ctx.LogE("sp-process", sdsp, err, "") + state.Ctx.LogE("sp-process", lesp, err, "") return nil, err } - sdsp["pkt"] = Base32Codec.EncodeToString(freq.Hash[:]) - sdsp["offset"] = freq.Offset - state.Ctx.LogD("sp-process", sdsp, "queueing") + lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(freq.Hash[:])}) + lesp = append(lesp, LE{"Offset", freq.Offset}) + state.Ctx.LogD("sp-process", lesp, "queueing") nice, exists := state.infosOurSeen[*freq.Hash] if exists { if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] { @@ -1082,15 +1076,15 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice} state.Unlock() } else { - state.Ctx.LogD("sp-process", sdsp, "skipping") + state.Ctx.LogD("sp-process", lesp, "skipping") } } else { - state.Ctx.LogD("sp-process", sdsp, "unknown") + state.Ctx.LogD("sp-process", lesp, "unknown") } default: state.Ctx.LogE( "sp-process", - SdsAdd(sds, SDS{"type": head.Type}), + append(les, LE{"Type", head.Type}), errors.New("unknown type"), "", ) @@ -1106,11 +1100,11 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { size += info.Size } state.RUnlock() - state.Ctx.LogI("sp-infos", SDS{ - "xx": string(TRx), - "node": state.Node.Id, - "pkts": pkts, - "size": int64(size), + state.Ctx.LogI("sp-infos", LEs{ + {"XX", string(TRx)}, + {"Node", state.Node.Id}, + {"Pkts", pkts}, + {"Size", int64(size)}, }, "") } return payloadsSplit(replies), nil diff --git a/src/tmp.go b/src/tmp.go index f1be071..e1e9127 100644 --- a/src/tmp.go +++ b/src/tmp.go @@ -44,7 +44,7 @@ func (ctx *Ctx) NewTmpFile() (*os.File, error) { } fd, err := TempFile(jobsPath, "") if err == nil { - ctx.LogD("tmp", SDS{"src": fd.Name()}, "created") + ctx.LogD("tmp", LEs{{"Src", fd.Name()}}, "created") } return fd, err } @@ -109,7 +109,7 @@ func (tmp *TmpFileWHash) Commit(dir string) error { return err } checksum := Base32Codec.EncodeToString(tmp.Hsh.Sum(nil)) - tmp.ctx.LogD("tmp", SDS{"src": tmp.Fd.Name(), "dst": checksum}, "commit") + tmp.ctx.LogD("tmp", LEs{{"Src", tmp.Fd.Name()}, {"Dst", checksum}}, "commit") if err = os.Rename(tmp.Fd.Name(), filepath.Join(dir, checksum)); err != nil { return err } diff --git a/src/toss.go b/src/toss.go index ad8acff..cc49784 100644 --- a/src/toss.go +++ b/src/toss.go @@ -71,7 +71,7 @@ func (ctx *Ctx) Toss( ) bool { dirLock, err := ctx.LockDir(nodeId, "toss") if err != nil { - ctx.LogE("rx", SDS{}, err, "lock") + ctx.LogE("rx", LEs{}, err, "lock") return false } defer ctx.UnlockDir(dirLock) @@ -84,9 +84,9 @@ func (ctx *Ctx) Toss( defer decompressor.Close() for job := range ctx.Jobs(nodeId, TRx) { pktName := filepath.Base(job.Fd.Name()) - sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName} + les := LEs{{"Node", job.PktEnc.Sender}, {"Pkt", pktName}} if job.PktEnc.Nice > nice { - ctx.LogD("rx", SdsAdd(sds, SDS{"nice": int(job.PktEnc.Nice)}), "too nice") + ctx.LogD("rx", append(les, LE{"Nice", int(job.PktEnc.Nice)}), "too nice") job.Fd.Close() // #nosec G104 continue } @@ -113,7 +113,7 @@ func (ctx *Ctx) Toss( var pktSize int64 var pktSizeBlocks int64 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil { - ctx.LogE("rx", sds, err, "unmarshal") + ctx.LogE("rx", les, err, "unmarshal") isBad = true goto Closing } @@ -123,8 +123,8 @@ func (ctx *Ctx) Toss( pktSize -= poly1305.TagSize } pktSize -= pktSizeBlocks * poly1305.TagSize - sds["size"] = pktSize - ctx.LogD("rx", sds, "taken") + les = append(les, LE{"Size", pktSize}) + ctx.LogD("rx", les, "taken") switch pkt.Type { case PktTypeExec, PktTypeExecFat: if noExec { @@ -137,14 +137,14 @@ func (ctx *Ctx) Toss( args = append(args, string(p)) } argsStr := strings.Join(append([]string{handle}, args...), " ") - sds := SdsAdd(sds, SDS{ - "type": "exec", - "dst": argsStr, - }) + les = append(les, LEs{ + {"Type", "exec"}, + {"Dst", argsStr}, + }...) sender := ctx.Neigh[*job.PktEnc.Sender] cmdline, exists := sender.Exec[handle] if !exists || len(cmdline) == 0 { - ctx.LogE("rx", sds, errors.New("No handle found"), "") + ctx.LogE("rx", les, errors.New("No handle found"), "") isBad = true goto Closing } @@ -171,7 +171,7 @@ func (ctx *Ctx) Toss( } output, err := cmd.Output() if err != nil { - ctx.LogE("rx", sds, err, "handle") + ctx.LogE("rx", les, err, "handle") isBad = true goto Closing } @@ -189,12 +189,12 @@ func (ctx *Ctx) Toss( "Exec from %s: %s", sender.Name, argsStr, ), output) if err = cmd.Run(); err != nil { - ctx.LogE("rx", sds, err, "notify") + ctx.LogE("rx", les, err, "notify") } } } } - ctx.LogI("rx", sds, "") + ctx.LogI("rx", les, "") if !dryRun { if doSeen { if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil { @@ -202,7 +202,7 @@ func (ctx *Ctx) Toss( } } if err = os.Remove(job.Fd.Name()); err != nil { - ctx.LogE("rx", sds, err, "remove") + ctx.LogE("rx", les, err, "remove") isBad = true } } @@ -211,57 +211,57 @@ func (ctx *Ctx) Toss( goto Closing } dst := string(pkt.Path[:int(pkt.PathLen)]) - sds := SdsAdd(sds, SDS{"type": "file", "dst": dst}) + les = append(les, LEs{{"Type", "file"}, {"Dst", dst}}...) if filepath.IsAbs(dst) { - ctx.LogE("rx", sds, errors.New("non-relative destination path"), "") + ctx.LogE("rx", les, errors.New("non-relative destination path"), "") isBad = true goto Closing } incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming if incoming == nil { - ctx.LogE("rx", sds, errors.New("incoming is not allowed"), "") + ctx.LogE("rx", les, errors.New("incoming is not allowed"), "") isBad = true goto Closing } dir := filepath.Join(*incoming, path.Dir(dst)) if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil { - ctx.LogE("rx", sds, err, "mkdir") + ctx.LogE("rx", les, err, "mkdir") isBad = true goto Closing } if !dryRun { tmp, err := TempFile(dir, "file") if err != nil { - ctx.LogE("rx", sds, err, "mktemp") + ctx.LogE("rx", les, err, "mktemp") isBad = true goto Closing } - sds["tmp"] = tmp.Name() - ctx.LogD("rx", sds, "created") + les = append(les, LE{"Tmp", tmp.Name()}) + ctx.LogD("rx", les, "created") bufW := bufio.NewWriter(tmp) if _, err = CopyProgressed( bufW, pipeR, "Rx file", - SdsAdd(sds, SDS{"fullsize": sds["size"]}), + append(les, LE{"FullSize", pktSize}), ctx.ShowPrgrs, ); err != nil { - ctx.LogE("rx", sds, err, "copy") + ctx.LogE("rx", les, err, "copy") isBad = true goto Closing } if err = bufW.Flush(); err != nil { tmp.Close() // #nosec G104 - ctx.LogE("rx", sds, err, "copy") + ctx.LogE("rx", les, err, "copy") isBad = true goto Closing } if err = tmp.Sync(); err != nil { tmp.Close() // #nosec G104 - ctx.LogE("rx", sds, err, "copy") + ctx.LogE("rx", les, err, "copy") isBad = true goto Closing } if err = tmp.Close(); err != nil { - ctx.LogE("rx", sds, err, "copy") + ctx.LogE("rx", les, err, "copy") isBad = true goto Closing } @@ -273,7 +273,7 @@ func (ctx *Ctx) Toss( if os.IsNotExist(err) { break } - ctx.LogE("rx", sds, err, "stat") + ctx.LogE("rx", les, err, "stat") isBad = true goto Closing } @@ -281,16 +281,16 @@ func (ctx *Ctx) Toss( dstPathCtr++ } if err = os.Rename(tmp.Name(), dstPath); err != nil { - ctx.LogE("rx", sds, err, "rename") + ctx.LogE("rx", les, err, "rename") isBad = true } if err = DirSync(*incoming); err != nil { - ctx.LogE("rx", sds, err, "sync") + ctx.LogE("rx", les, err, "sync") isBad = true } - delete(sds, "tmp") + les = les[:len(les)-1] // delete Tmp } - ctx.LogI("rx", sds, "") + ctx.LogI("rx", les, "") if !dryRun { if doSeen { if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil { @@ -298,7 +298,7 @@ func (ctx *Ctx) Toss( } } if err = os.Remove(job.Fd.Name()); err != nil { - ctx.LogE("rx", sds, err, "remove") + ctx.LogE("rx", les, err, "remove") isBad = true } if len(sendmail) > 0 && ctx.NotifyFile != nil { @@ -313,7 +313,7 @@ func (ctx *Ctx) Toss( humanize.IBytes(uint64(pktSize)), ), nil) if err = cmd.Run(); err != nil { - ctx.LogE("rx", sds, err, "notify") + ctx.LogE("rx", les, err, "notify") } } } @@ -323,23 +323,23 @@ func (ctx *Ctx) Toss( } src := string(pkt.Path[:int(pkt.PathLen)]) if filepath.IsAbs(src) { - ctx.LogE("rx", sds, errors.New("non-relative source path"), "") + ctx.LogE("rx", les, errors.New("non-relative source path"), "") isBad = true goto Closing } - sds := SdsAdd(sds, SDS{"type": "freq", "src": src}) + les := append(les, LEs{{"Type", "freq"}, {"Src", src}}...) dstRaw, err := ioutil.ReadAll(pipeR) if err != nil { - ctx.LogE("rx", sds, err, "read") + ctx.LogE("rx", les, err, "read") isBad = true goto Closing } dst := string(dstRaw) - sds["dst"] = dst + les = append(les, LE{"Dst", dst}) sender := ctx.Neigh[*job.PktEnc.Sender] freqPath := sender.FreqPath if freqPath == nil { - ctx.LogE("rx", sds, errors.New("freqing is not allowed"), "") + ctx.LogE("rx", les, errors.New("freqing is not allowed"), "") isBad = true goto Closing } @@ -354,12 +354,12 @@ func (ctx *Ctx) Toss( sender.FreqMaxSize, ) if err != nil { - ctx.LogE("rx", sds, err, "tx file") + ctx.LogE("rx", les, err, "tx file") isBad = true goto Closing } } - ctx.LogI("rx", sds, "") + ctx.LogI("rx", les, "") if !dryRun { if doSeen { if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil { @@ -367,7 +367,7 @@ func (ctx *Ctx) Toss( } } if err = os.Remove(job.Fd.Name()); err != nil { - ctx.LogE("rx", sds, err, "remove") + ctx.LogE("rx", les, err, "remove") isBad = true } if len(sendmail) > 0 && ctx.NotifyFreq != nil { @@ -379,7 +379,7 @@ func (ctx *Ctx) Toss( "Freq from %s: %s", sender.Name, src, ), nil) if err = cmd.Run(); err != nil { - ctx.LogE("rx", sds, err, "notify") + ctx.LogE("rx", les, err, "notify") } } } @@ -391,21 +391,21 @@ func (ctx *Ctx) Toss( copy(dst[:], pkt.Path[:int(pkt.PathLen)]) nodeId := NodeId(*dst) node, known := ctx.Neigh[nodeId] - sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId}) + les := append(les, LEs{{"Type", "trns"}, {"Dst", nodeId}}...) if !known { - ctx.LogE("rx", sds, errors.New("unknown node"), "") + ctx.LogE("rx", les, errors.New("unknown node"), "") isBad = true goto Closing } - ctx.LogD("rx", sds, "taken") + ctx.LogD("rx", les, "taken") if !dryRun { if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil { - ctx.LogE("rx", sds, err, "tx trns") + ctx.LogE("rx", les, err, "tx trns") isBad = true goto Closing } } - ctx.LogI("rx", sds, "") + ctx.LogI("rx", les, "") if !dryRun { if doSeen { if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil { @@ -413,12 +413,12 @@ func (ctx *Ctx) Toss( } } if err = os.Remove(job.Fd.Name()); err != nil { - ctx.LogE("rx", sds, err, "remove") + ctx.LogE("rx", les, err, "remove") isBad = true } } default: - ctx.LogE("rx", sds, errors.New("unknown type"), "") + ctx.LogE("rx", les, errors.New("unknown type"), "") isBad = true } Closing: diff --git a/src/tx.go b/src/tx.go index 3352f86..c89957b 100644 --- a/src/tx.go +++ b/src/tx.go @@ -80,10 +80,10 @@ func (ctx *Ctx) Tx( curSize := size pipeR, pipeW := io.Pipe() go func(size int64, src io.Reader, dst io.WriteCloser) { - ctx.LogD("tx", SDS{ - "node": hops[0].Id, - "nice": int(nice), - "size": size, + ctx.LogD("tx", LEs{ + {"Node", hops[0].Id}, + {"Nice", int(nice)}, + {"Size", size}, }, "wrote") errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst) dst.Close() // #nosec G104 @@ -96,10 +96,10 @@ func (ctx *Ctx) Tx( pipeRPrev = pipeR pipeR, pipeW = io.Pipe() go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) { - ctx.LogD("tx", SDS{ - "node": node.Id, - "nice": int(nice), - "size": size, + ctx.LogD("tx", LEs{ + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Size", size}, }, "trns wrote") errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst) dst.Close() // #nosec G104 @@ -109,7 +109,7 @@ func (ctx *Ctx) Tx( go func() { _, err := CopyProgressed( tmp.W, pipeR, "Tx", - SDS{"pkt": pktName, "fullsize": curSize}, + LEs{{"Pkt", pktName}, {"FullSize", curSize}}, ctx.ShowPrgrs, ) errs <- err @@ -347,18 +347,18 @@ func (ctx *Ctx) TxFile( return err } _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath) - sds := SDS{ - "type": "file", - "node": node.Id, - "nice": int(nice), - "src": srcPath, - "dst": dstPath, - "size": fileSize, + les := LEs{ + {"Type", "file"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Src", srcPath}, + {"Dst", dstPath}, + {"Size", fileSize}, } if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, "sent") } else { - ctx.LogE("tx", sds, err, "sent") + ctx.LogE("tx", les, err, "sent") } return err } @@ -403,18 +403,18 @@ func (ctx *Ctx) TxFile( io.TeeReader(reader, hsh), path, ) - sds := SDS{ - "type": "file", - "node": node.Id, - "nice": int(nice), - "src": srcPath, - "dst": path, - "size": sizeToSend, + les := LEs{ + {"Type", "file"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Src", srcPath}, + {"Dst", path}, + {"Size", sizeToSend}, } if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, "sent") } else { - ctx.LogE("tx", sds, err, "sent") + ctx.LogE("tx", les, err, "sent") return err } hsh.Sum(metaPkt.Checksums[chunkNum][:0]) @@ -436,18 +436,18 @@ func (ctx *Ctx) TxFile( } metaPktSize := int64(metaBuf.Len()) _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path) - sds := SDS{ - "type": "file", - "node": node.Id, - "nice": int(nice), - "src": srcPath, - "dst": path, - "size": metaPktSize, + les := LEs{ + {"Type", "file"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Src", srcPath}, + {"Dst", path}, + {"Size", metaPktSize}, } if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, "sent") } else { - ctx.LogE("tx", sds, err, "sent") + ctx.LogE("tx", les, err, "sent") } return err } @@ -472,18 +472,18 @@ func (ctx *Ctx) TxFreq( src := strings.NewReader(dstPath) size := int64(src.Len()) _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath) - sds := SDS{ - "type": "freq", - "node": node.Id, - "nice": int(nice), - "replynice": int(replyNice), - "src": srcPath, - "dst": dstPath, + les := LEs{ + {"Type", "freq"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"ReplyNice", int(replyNice)}, + {"Src", srcPath}, + {"Dst", dstPath}, } if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, "sent") } else { - ctx.LogE("tx", sds, err, "sent") + ctx.LogE("tx", les, err, "sent") } return err } @@ -583,33 +583,33 @@ func (ctx *Ctx) TxExec( _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle) } - sds := SDS{ - "type": "exec", - "node": node.Id, - "nice": int(nice), - "replynice": int(replyNice), - "dst": strings.Join(append([]string{handle}, args...), " "), - "size": size, + les := LEs{ + {"Type", "exec"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"ReplyNice", int(replyNice)}, + {"Dst", strings.Join(append([]string{handle}, args...), " ")}, + {"Size", size}, } if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, "sent") } else { - ctx.LogE("tx", sds, err, "sent") + ctx.LogE("tx", les, err, "sent") } return err } func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error { - sds := SDS{ - "type": "trns", - "node": node.Id, - "nice": int(nice), - "size": size, + les := LEs{ + {"Type", "trns"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Size", size}, } - ctx.LogD("tx", sds, "taken") + ctx.LogD("tx", les, "taken") if !ctx.IsEnoughSpace(size) { err := errors.New("is not enough space") - ctx.LogE("tx", sds, err, err.Error()) + ctx.LogE("tx", les, err, err.Error()) return err } tmp, err := ctx.NewTmpFileWHash() @@ -618,7 +618,7 @@ func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error } if _, err = CopyProgressed( tmp.W, src, "Tx trns", - SDS{"pkt": node.Id.String(), "fullsize": size}, + LEs{{"Pkt", node.Id.String()}, {"FullSize", size}}, ctx.ShowPrgrs, ); err != nil { return err @@ -626,9 +626,9 @@ func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error nodePath := filepath.Join(ctx.Spool, node.Id.String()) err = tmp.Commit(filepath.Join(nodePath, string(TTx))) if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, "sent") } else { - ctx.LogI("tx", SdsAdd(sds, SDS{"err": err}), "sent") + ctx.LogI("tx", append(les, LE{"Err", err}), "sent") } os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) // #nosec G104 return err