From 5a9bf58a2638e42f2d42fa4d43c363a664fe8198 Mon Sep 17 00:00:00 2001 From: Sergey Matveev Date: Sat, 6 Mar 2021 15:05:59 +0300 Subject: [PATCH] Logging refactoring, no centralized humanizer --- doc/news.ru.texi | 10 + doc/news.texi | 10 + src/call.go | 50 ++- src/check.go | 13 +- src/cmd/nncp-bundle/main.go | 141 ++++++-- src/cmd/nncp-caller/main.go | 37 +- src/cmd/nncp-daemon/main.go | 37 +- src/cmd/nncp-reass/main.go | 78 +++-- src/cmd/nncp-rm/main.go | 25 +- src/cmd/nncp-xfer/main.go | 192 ++++++++--- src/ctx.go | 8 +- src/humanizer.go | 268 +-------------- src/jobs.go | 29 +- src/lockdir.go | 9 +- src/log.go | 18 +- src/nice.go | 14 + src/nncp.go | 2 +- src/sortbynice.go | 15 - src/sp.go | 663 ++++++++++++++++++++++++++++-------- src/tmp.go | 13 +- src/toss.go | 341 ++++++++++++++++--- src/tx.go | 99 +++++- 22 files changed, 1434 insertions(+), 638 deletions(-) delete mode 100644 src/sortbynice.go diff --git a/doc/news.ru.texi b/doc/news.ru.texi index 9e9c669..1a9dc69 100644 --- a/doc/news.ru.texi +++ b/doc/news.ru.texi @@ -1,6 +1,16 @@ @node Новости @section Новости +@node Релиз 6.2.0 +@subsection Релиз 6.2.0 +@itemize + +@item +Очередной рефакторинг и упрощение системы журналирования. +Не должно быть видимых изменений для конечного пользователя. + +@end itemize + @node Релиз 6.1.0 @subsection Релиз 6.1.0 @itemize diff --git a/doc/news.texi b/doc/news.texi index 6be8466..8c809d3 100644 --- a/doc/news.texi +++ b/doc/news.texi @@ -3,6 +3,16 @@ See also this page @ref{Новости, on russian}. +@node Release 6.2.0 +@section Release 6.2.0 +@itemize + +@item +Yet another logging refactoring and simplification. +Should be no visible differences to the end user. + +@end itemize + @node Release 6.1.0 @section Release 6.1.0 @itemize diff --git a/src/call.go b/src/call.go index 2f635d6..ab254b2 100644 --- a/src/call.go +++ b/src/call.go @@ -18,9 +18,11 @@ along with this program. If not, see . package nncp import ( + "fmt" "net" "time" + "github.com/dustin/go-humanize" "github.com/gorhill/cronexpr" ) @@ -57,7 +59,9 @@ func (ctx *Ctx) CallNode( ) (isGood bool) { for _, addr := range addrs { les := LEs{{"Node", node.Id}, {"Addr", addr}} - ctx.LogD("call", les, "dialing") + ctx.LogD("calling", les, func(les LEs) string { + return fmt.Sprintf("Calling %s (%s)", node.Name, addr) + }) var conn ConnDeadlined var err error if addr[0] == '|' { @@ -66,10 +70,14 @@ func (ctx *Ctx) CallNode( conn, err = net.Dial("tcp", addr) } if err != nil { - ctx.LogD("call", append(les, LE{"Err", err}), "dialing") + ctx.LogD("calling", append(les, LE{"Err", err}), func(les LEs) string { + return fmt.Sprintf("Calling %s (%s)", node.Name, addr) + }) continue } - ctx.LogD("call", les, "connected") + ctx.LogD("call-connected", les, func(les LEs) string { + return fmt.Sprintf("Connected %s (%s)", node.Name, addr) + }) state := SPState{ Ctx: ctx, Node: node, @@ -84,21 +92,37 @@ func (ctx *Ctx) CallNode( onlyPkts: onlyPkts, } if err = state.StartI(conn); err == nil { - ctx.LogI("call-start", les, "connected") + ctx.LogI("call-started", les, func(les LEs) string { + return fmt.Sprintf("Connection to %s (%s)", node.Name, addr) + }) state.Wait() - ctx.LogI("call-finish", LEs{ - {"Node", state.Node.Id}, - {"Duration", int64(state.Duration.Seconds())}, - {"RxBytes", state.RxBytes}, - {"TxBytes", state.TxBytes}, - {"RxSpeed", state.RxSpeed}, - {"TxSpeed", state.TxSpeed}, - }, "") + ctx.LogI("call-finished", append( + les, + LE{"Duration", int64(state.Duration.Seconds())}, + LE{"RxBytes", state.RxBytes}, + LE{"RxSpeed", state.RxSpeed}, + LE{"TxBytes", state.TxBytes}, + LE{"TxSpeed", state.TxSpeed}, + ), func(les LEs) string { + return fmt.Sprintf( + "Finished call with %s (%d:%d:%d): %s received (%s/sec), %s transferred (%s/sec)", + node.Name, + int(state.Duration.Hours()), + int(state.Duration.Minutes()), + int(state.Duration.Seconds()), + humanize.IBytes(uint64(state.RxBytes)), + humanize.IBytes(uint64(state.RxSpeed)), + humanize.IBytes(uint64(state.TxBytes)), + humanize.IBytes(uint64(state.TxSpeed)), + ) + }) isGood = true conn.Close() // #nosec G104 break } else { - ctx.LogE("call-start", les, err, "") + ctx.LogE("call-started", les, err, func(les LEs) string { + return fmt.Sprintf("Connection to %s (%s)", node.Name, addr) + }) conn.Close() // #nosec G104 } } diff --git a/src/check.go b/src/check.go index ac26e9c..a8a32e2 100644 --- a/src/check.go +++ b/src/check.go @@ -21,6 +21,7 @@ import ( "bufio" "bytes" "errors" + "fmt" "io" "log" "os" @@ -45,26 +46,30 @@ func Check(src io.Reader, checksum []byte, les LEs, showPrgrs bool) (bool, error func (ctx *Ctx) checkXxIsBad(nodeId *NodeId, xx TRxTx) bool { isBad := false for job := range ctx.Jobs(nodeId, xx) { + pktName := Base32Codec.EncodeToString(job.HshValue[:]) les := LEs{ {"XX", string(xx)}, {"Node", nodeId}, - {"Pkt", Base32Codec.EncodeToString(job.HshValue[:])}, + {"Pkt", pktName}, {"FullSize", job.Size}, } + logMsg := func(les LEs) string { + return fmt.Sprintf("Checking: %s/%s/%s", nodeId, string(xx), pktName) + } fd, err := os.Open(job.Path) if err != nil { - ctx.LogE("check", les, err, "") + ctx.LogE("checking", les, err, logMsg) return true } gut, err := Check(fd, job.HshValue[:], les, ctx.ShowPrgrs) fd.Close() // #nosec G104 if err != nil { - ctx.LogE("check", les, err, "") + ctx.LogE("checking", les, err, logMsg) return true } if !gut { isBad = true - ctx.LogE("check", les, errors.New("bad"), "") + ctx.LogE("checking", les, errors.New("bad"), logMsg) } } return isBad diff --git a/src/cmd/nncp-bundle/main.go b/src/cmd/nncp-bundle/main.go index d7813a9..365d8b5 100644 --- a/src/cmd/nncp-bundle/main.go +++ b/src/cmd/nncp-bundle/main.go @@ -33,6 +33,7 @@ import ( "strings" xdr "github.com/davecgh/go-xdr/xdr2" + "github.com/dustin/go-humanize" "go.cypherpunks.ru/nncp/v6" "golang.org/x/crypto/blake2b" ) @@ -119,16 +120,22 @@ func main() { bufStdout := bufio.NewWriter(os.Stdout) tarWr := tar.NewWriter(bufStdout) for nodeId := range nodeIds { - les := nncp.LEs{ - {K: "XX", V: string(nncp.TTx)}, - {K: "Node", V: nodeId.String()}, - {K: "Pkt", V: "dummy"}, - } for job := range ctx.Jobs(&nodeId, nncp.TTx) { pktName = filepath.Base(job.Path) - les[len(les)-1].V = pktName + les := nncp.LEs{ + {K: "XX", V: string(nncp.TTx)}, + {K: "Node", V: nodeId.String()}, + {K: "Pkt", V: pktName}, + } if job.PktEnc.Nice > nice { - ctx.LogD("nncp-bundle", les, "too nice") + ctx.LogD("bundle-tx-too-nice", les, func(les nncp.LEs) string { + return fmt.Sprintf( + "Bundle transfer %s/tx/%s: too nice %s", + ctx.NodeName(&nodeId), + pktName, + nncp.NicenessFmt(job.PktEnc.Nice), + ) + }) continue } fd, err := os.Open(job.Path) @@ -183,7 +190,18 @@ func main() { os.Remove(job.Path + nncp.HdrSuffix) } } - ctx.LogI("nncp-bundle", append(les, nncp.LE{K: "Size", V: job.Size}), "") + ctx.LogI( + "bundle-tx", + append(les, nncp.LE{K: "Size", V: job.Size}), + func(les nncp.LEs) string { + return fmt.Sprintf( + "Bundle transfer, sent to node %s %s (%s)", + ctx.NodeName(&nodeId), + pktName, + humanize.IBytes(uint64(job.Size)), + ) + }, + ) } } if err = tarWr.Close(); err != nil { @@ -214,18 +232,25 @@ func main() { if err != nil { if err != io.EOF { ctx.LogD( - "nncp-bundle", + "bundle-rx-read-tar", nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Err", V: err}}, - "error reading tar", + func(les nncp.LEs) string { + return "Bundle transfer rx: reading tar" + }, ) } continue } if entry.Typeflag != tar.TypeDir { ctx.LogD( - "nncp-bundle", - nncp.LEs{{K: "XX", V: string(nncp.TRx)}}, - "Expected NNCP/", + "bundle-rx-read-tar", + nncp.LEs{ + {K: "XX", V: string(nncp.TRx)}, + {K: "Err", V: errors.New("expected NNCP/")}, + }, + func(les nncp.LEs) string { + return "Bundle transfer rx: reading tar" + }, ) continue } @@ -233,47 +258,74 @@ func main() { if err != nil { if err != io.EOF { ctx.LogD( - "nncp-bundle", + "bundle-rx-read-tar", nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Err", V: err}}, - "error reading tar", + func(les nncp.LEs) string { + return "Bundle transfer rx: reading tar" + }, ) } continue } les := nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Pkt", V: entry.Name}} + logMsg := func(les nncp.LEs) string { + return "Bundle transfer rx/" + entry.Name + } if entry.Size < nncp.PktEncOverhead { - ctx.LogD("nncp-bundle", les, "Too small packet") + ctx.LogD("bundle-rx-too-small", les, func(les nncp.LEs) string { + return logMsg(les) + ": too small packet" + }) continue } if !ctx.IsEnoughSpace(entry.Size) { - ctx.LogE("nncp-bundle", les, errors.New("not enough spool space"), "") + ctx.LogE("bundle-rx", les, errors.New("not enough spool space"), logMsg) continue } pktName := filepath.Base(entry.Name) if _, err = nncp.Base32Codec.DecodeString(pktName); err != nil { - ctx.LogD("nncp-bundle", append(les, nncp.LE{K: "Err", V: "bad packet name"}), "") + ctx.LogD( + "bundle-rx", + append(les, nncp.LE{K: "Err", V: "bad packet name"}), + logMsg, + ) continue } if _, err = io.ReadFull(tarR, pktEncBuf); err != nil { - ctx.LogD("nncp-bundle", append(les, nncp.LE{K: "Err", V: err}), "read") + ctx.LogD( + "bundle-rx", + append(les, nncp.LE{K: "Err", V: err}), + logMsg, + ) continue } if _, err = xdr.Unmarshal(bytes.NewReader(pktEncBuf), &pktEnc); err != nil { - ctx.LogD("nncp-bundle", les, "Bad packet structure") + ctx.LogD( + "bundle-rx", + append(les, nncp.LE{K: "Err", V: "Bad packet structure"}), + logMsg, + ) continue } if pktEnc.Magic != nncp.MagicNNCPEv4 { - ctx.LogD("nncp-bundle", les, "Bad packet magic number") + ctx.LogD( + "bundle-rx", + append(les, nncp.LE{K: "Err", V: "Bad packet magic number"}), + logMsg, + ) continue } if pktEnc.Nice > nice { - ctx.LogD("nncp-bundle", les, "too nice") + ctx.LogD("bundle-rx-too-nice", les, func(les nncp.LEs) string { + return logMsg(les) + ": too nice" + }) continue } if *pktEnc.Sender == *ctx.SelfId && *doDelete { if len(nodeIds) > 0 { if _, exists := nodeIds[*pktEnc.Recipient]; !exists { - ctx.LogD("nncp-bundle", les, "Recipient is not requested") + ctx.LogD("bundle-tx-skip", les, func(les nncp.LEs) string { + return logMsg(les) + ": recipient is not requested" + }) continue } } @@ -283,9 +335,14 @@ func main() { {K: "Node", V: nodeId32}, {K: "Pkt", V: pktName}, } + logMsg = func(les nncp.LEs) string { + return fmt.Sprintf("Bundle transfer %s/tx/%s", nodeId32, pktName) + } dstPath := filepath.Join(ctx.Spool, nodeId32, string(nncp.TTx), pktName) if _, err = os.Stat(dstPath); err != nil { - ctx.LogD("nncp-bundle", les, "Packet is already missing") + ctx.LogD("bundle-tx-missing", les, func(les nncp.LEs) string { + return logMsg(les) + ": packet is already missing" + }) continue } hsh, err := blake2b.New256(nil) @@ -303,7 +360,9 @@ func main() { log.Fatalln("Error during copying:", err) } if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) == pktName { - ctx.LogI("nncp-bundle", les, "removed") + ctx.LogI("bundle-tx-removed", les, func(les nncp.LEs) string { + return logMsg(les) + ": removed" + }) if !*dryRun { os.Remove(dstPath) if ctx.HdrUsage { @@ -311,17 +370,21 @@ func main() { } } } else { - ctx.LogE("nncp-bundle", les, errors.New("bad checksum"), "") + ctx.LogE("bundle-tx", les, errors.New("bad checksum"), logMsg) } continue } if *pktEnc.Recipient != *ctx.SelfId { - ctx.LogD("nncp-bundle", les, "Unknown recipient") + ctx.LogD("nncp-bundle", les, func(les nncp.LEs) string { + return logMsg(les) + ": unknown recipient" + }) continue } if len(nodeIds) > 0 { if _, exists := nodeIds[*pktEnc.Sender]; !exists { - ctx.LogD("nncp-bundle", les, "Sender is not requested") + ctx.LogD("bundle-rx-skip", les, func(les nncp.LEs) string { + return logMsg(les) + ": sender is not requested" + }) continue } } @@ -332,14 +395,21 @@ func main() { {K: "Pkt", V: pktName}, {K: "FullSize", V: entry.Size}, } + logMsg = func(les nncp.LEs) string { + return fmt.Sprintf("Bundle transfer %s/rx/%s", sender, pktName) + } dstDirPath := filepath.Join(ctx.Spool, sender, string(nncp.TRx)) dstPath := filepath.Join(dstDirPath, pktName) if _, err = os.Stat(dstPath); err == nil || !os.IsNotExist(err) { - ctx.LogD("nncp-bundle", les, "Packet already exists") + ctx.LogD("bundle-rx-exists", les, func(les nncp.LEs) string { + return logMsg(les) + ": packet already exists" + }) continue } if _, err = os.Stat(dstPath + nncp.SeenSuffix); err == nil || !os.IsNotExist(err) { - ctx.LogD("nncp-bundle", les, "Packet already exists") + ctx.LogD("bundle-rx-seen", les, func(les nncp.LEs) string { + return logMsg(les) + ": packet already seen" + }) continue } if *doCheck { @@ -355,7 +425,7 @@ func main() { log.Fatalln("Error during copying:", err) } if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) != pktName { - ctx.LogE("nncp-bundle", les, errors.New("bad checksum"), "") + ctx.LogE("bundle-rx", les, errors.New("bad checksum"), logMsg) continue } } else { @@ -377,7 +447,7 @@ func main() { log.Fatalln("Error during commiting:", err) } } else { - ctx.LogE("nncp-bundle", les, errors.New("bad checksum"), "") + ctx.LogE("bundle-rx", les, errors.New("bad checksum"), logMsg) tmp.Cancel() continue } @@ -428,7 +498,12 @@ func main() { break } } - ctx.LogI("nncp-bundle", les, "") + ctx.LogI("bundle-rx", les, func(les nncp.LEs) string { + return fmt.Sprintf( + "Bundle transfer, received from %s %s (%s)", + sender, pktName, humanize.IBytes(uint64(entry.Size)), + ) + }) } } } diff --git a/src/cmd/nncp-caller/main.go b/src/cmd/nncp-caller/main.go index dd18ac3..dd93bff 100644 --- a/src/cmd/nncp-caller/main.go +++ b/src/cmd/nncp-caller/main.go @@ -86,7 +86,13 @@ func main() { log.Fatalln("Invalid NODE specified:", err) } if len(node.Calls) == 0 { - ctx.LogD("caller", nncp.LEs{{K: "Node", V: node.Id}}, "has no calls, skipping") + ctx.LogD( + "caller-no-calls", + nncp.LEs{{K: "Node", V: node.Id}}, + func(les nncp.LEs) string { + return fmt.Sprintf("%s node has no calls, skipping", node.Name) + }, + ) continue } nodes = append(nodes, node) @@ -94,7 +100,13 @@ func main() { } else { for _, node := range ctx.Neigh { if len(node.Calls) == 0 { - ctx.LogD("caller", nncp.LEs{{K: "Node", V: node.Id}}, "has no calls, skipping") + ctx.LogD( + "caller-no-calls", + nncp.LEs{{K: "Node", V: node.Id}}, + func(les nncp.LEs) string { + return fmt.Sprintf("%s node has no calls, skipping", node.Name) + }, + ) continue } nodes = append(nodes, node) @@ -116,26 +128,35 @@ func main() { addrs = append(addrs, *call.Addr) } les := nncp.LEs{{K: "Node", V: node.Id}, {K: "CallIndex", V: i}} + logMsg := func(les nncp.LEs) string { + return fmt.Sprintf("%s node, call %d", node.Name, i) + } for { n := time.Now() t := call.Cron.Next(n) - ctx.LogD("caller", les, t.String()) + ctx.LogD("caller-time", les, func(les nncp.LEs) string { + return logMsg(les) + ": " + t.String() + }) if t.IsZero() { - ctx.LogE("caller", les, errors.New("got zero time"), "") + ctx.LogE("caller", les, errors.New("got zero time"), logMsg) return } time.Sleep(t.Sub(n)) node.Lock() if node.Busy { node.Unlock() - ctx.LogD("caller", les, "busy") + ctx.LogD("caller-busy", les, func(les nncp.LEs) string { + return logMsg(les) + ": busy" + }) continue } else { node.Busy = true node.Unlock() if call.WhenTxExists && call.Xx != "TRx" { - ctx.LogD("caller", les, "checking tx existence") + ctx.LogD("caller", les, func(les nncp.LEs) string { + return logMsg(les) + ": checking tx existence" + }) txExists := false for job := range ctx.Jobs(node.Id, nncp.TTx) { if job.PktEnc.Nice > call.Nice { @@ -144,7 +165,9 @@ func main() { txExists = true } if !txExists { - ctx.LogD("caller", les, "no tx") + ctx.LogD("caller-no-tx", les, func(les nncp.LEs) string { + return logMsg(les) + ": no tx" + }) node.Lock() node.Busy = false node.Unlock() diff --git a/src/cmd/nncp-daemon/main.go b/src/cmd/nncp-daemon/main.go index 3b40f1c..a69175a 100644 --- a/src/cmd/nncp-daemon/main.go +++ b/src/cmd/nncp-daemon/main.go @@ -26,6 +26,7 @@ import ( "os" "time" + "github.com/dustin/go-humanize" "go.cypherpunks.ru/nncp/v6" "golang.org/x/net/netutil" ) @@ -79,17 +80,33 @@ func performSP( NoCK: noCK, } if err := state.StartR(conn); err == nil { - ctx.LogI("call-start", nncp.LEs{{K: "Node", V: state.Node.Id}}, "connected") + ctx.LogI( + "call-started", + nncp.LEs{{K: "Node", V: state.Node.Id}}, + func(les nncp.LEs) string { return "Connection with " + state.Node.Name }, + ) nodeIdC <- state.Node.Id state.Wait() - ctx.LogI("call-finish", nncp.LEs{ + ctx.LogI("call-finished", nncp.LEs{ {K: "Node", V: state.Node.Id}, {K: "Duration", V: int64(state.Duration.Seconds())}, {K: "RxBytes", V: state.RxBytes}, {K: "TxBytes", V: state.TxBytes}, {K: "RxSpeed", V: state.RxSpeed}, {K: "TxSpeed", V: state.TxSpeed}, - }, "") + }, func(les nncp.LEs) string { + return fmt.Sprintf( + "Finished call with %s (%d:%d:%d): %s received (%s/sec), %s transferred (%s/sec)", + state.Node.Name, + int(state.Duration.Hours()), + int(state.Duration.Minutes()), + int(state.Duration.Seconds()), + humanize.IBytes(uint64(state.RxBytes)), + humanize.IBytes(uint64(state.RxSpeed)), + humanize.IBytes(uint64(state.TxBytes)), + humanize.IBytes(uint64(state.TxSpeed)), + ) + }) } else { nodeId := "unknown" if state.Node == nil { @@ -98,7 +115,11 @@ func performSP( nodeIdC <- state.Node.Id nodeId = state.Node.Id.String() } - ctx.LogI("call-start", nncp.LEs{{K: "Node", V: nodeId}}, "connected") + ctx.LogI( + "call-started", + nncp.LEs{{K: "Node", V: nodeId}}, + func(les nncp.LEs) string { return "Connected to " + state.Node.Name }, + ) } close(nodeIdC) } @@ -197,7 +218,13 @@ func main() { if err != nil { log.Fatalln("Can not accept connection:", err) } - ctx.LogD("daemon", nncp.LEs{{K: "Addr", V: conn.RemoteAddr()}}, "accepted") + ctx.LogD( + "daemon-accepted", + nncp.LEs{{K: "Addr", V: conn.RemoteAddr()}}, + func(les nncp.LEs) string { + return "Accepted connection with " + conn.RemoteAddr().String() + }, + ) go func(conn net.Conn) { nodeIdC := make(chan *nncp.NodeId) go performSP(ctx, conn, nice, *noCK, nodeIdC) diff --git a/src/cmd/nncp-reass/main.go b/src/cmd/nncp-reass/main.go index 8dc1b68..4ed30a5 100644 --- a/src/cmd/nncp-reass/main.go +++ b/src/cmd/nncp-reass/main.go @@ -58,19 +58,24 @@ func process(ctx *nncp.Ctx, path string, keep, dryRun, stdout, dumpMeta bool) bo } var metaPkt nncp.ChunkedMeta les := nncp.LEs{{K: "Path", V: path}} + logMsg := func(les nncp.LEs) string { + return fmt.Sprintf("Reassembling chunked file \"%s\"", path) + } if _, err = xdr.Unmarshal(fd, &metaPkt); err != nil { - ctx.LogE("nncp-reass", les, err, "bad meta file") + ctx.LogE("reass-bad-meta", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": bad meta" + }) return false } fd.Close() // #nosec G104 if metaPkt.Magic != nncp.MagicNNCPMv1 { - ctx.LogE("nncp-reass", les, nncp.BadMagic, "") + ctx.LogE("reass", les, nncp.BadMagic, logMsg) return false } metaName := filepath.Base(path) if !strings.HasSuffix(metaName, nncp.ChunkedSuffixMeta) { - ctx.LogE("nncp-reass", les, errors.New("invalid filename suffix"), "") + ctx.LogE("reass", les, errors.New("invalid filename suffix"), logMsg) return false } mainName := strings.TrimSuffix(metaName, nncp.ChunkedSuffixMeta) @@ -108,7 +113,9 @@ func process(ctx *nncp.Ctx, path string, keep, dryRun, stdout, dumpMeta bool) bo fi, err := os.Stat(chunkPath) lesChunk := append(les, nncp.LE{K: "Chunk", V: chunkNum}) if err != nil && os.IsNotExist(err) { - ctx.LogI("nncp-reass", lesChunk, "missing") + ctx.LogI("reass-chunk-miss", lesChunk, func(les nncp.LEs) string { + return fmt.Sprintf("%s: chunk %d missing", logMsg(les), chunkNum) + }) allChunksExist = false continue } @@ -119,7 +126,14 @@ func process(ctx *nncp.Ctx, path string, keep, dryRun, stdout, dumpMeta bool) bo badSize = uint64(fi.Size()) != metaPkt.ChunkSize } if badSize { - ctx.LogE("nncp-reass", lesChunk, errors.New("invalid size"), "") + ctx.LogE( + "reass-chunk", + lesChunk, + errors.New("invalid size"), + func(les nncp.LEs) string { + return fmt.Sprintf("%s: chunk %d", logMsg(les), chunkNum) + }, + ) allChunksExist = false } } @@ -144,10 +158,7 @@ func process(ctx *nncp.Ctx, path string, keep, dryRun, stdout, dumpMeta bool) bo } if _, err = nncp.CopyProgressed( hsh, bufio.NewReader(fd), "check", - nncp.LEs{ - {K: "Pkt", V: chunkPath}, - {K: "FullSize", V: fi.Size()}, - }, + nncp.LEs{{K: "Pkt", V: chunkPath}, {K: "FullSize", V: fi.Size()}}, ctx.ShowPrgrs, ); err != nil { log.Fatalln(err) @@ -155,9 +166,12 @@ func process(ctx *nncp.Ctx, path string, keep, dryRun, stdout, dumpMeta bool) bo fd.Close() // #nosec G104 if bytes.Compare(hsh.Sum(nil), metaPkt.Checksums[chunkNum][:]) != 0 { ctx.LogE( - "nncp-reass", + "reass-chunk", nncp.LEs{{K: "Path", V: path}, {K: "Chunk", V: chunkNum}}, - errors.New("checksum is bad"), "", + errors.New("checksum is bad"), + func(les nncp.LEs) string { + return fmt.Sprintf("%s: chunk %d", logMsg(les), chunkNum) + }, ) allChecksumsGood = false } @@ -166,7 +180,7 @@ func process(ctx *nncp.Ctx, path string, keep, dryRun, stdout, dumpMeta bool) bo return false } if dryRun { - ctx.LogI("nncp-reass", nncp.LEs{{K: "path", V: path}}, "ready") + ctx.LogI("reass", nncp.LEs{{K: "path", V: path}}, logMsg) return true } @@ -181,7 +195,9 @@ func process(ctx *nncp.Ctx, path string, keep, dryRun, stdout, dumpMeta bool) bo log.Fatalln(err) } les = nncp.LEs{{K: "path", V: path}, {K: "Tmp", V: tmp.Name()}} - ctx.LogD("nncp-reass", les, "created") + ctx.LogD("reass-tmp-created", les, func(les nncp.LEs) string { + return fmt.Sprintf("%s: temporary %s created", logMsg(les), tmp.Name()) + }) dst = tmp } dstW := bufio.NewWriter(dst) @@ -198,10 +214,7 @@ func process(ctx *nncp.Ctx, path string, keep, dryRun, stdout, dumpMeta bool) bo } if _, err = nncp.CopyProgressed( dstW, bufio.NewReader(fd), "reass", - nncp.LEs{ - {K: "Pkt", V: chunkPath}, - {K: "FullSize", V: fi.Size()}, - }, + nncp.LEs{{K: "Pkt", V: chunkPath}, {K: "FullSize", V: fi.Size()}}, ctx.ShowPrgrs, ); err != nil { log.Fatalln(err) @@ -209,7 +222,13 @@ func process(ctx *nncp.Ctx, path string, keep, dryRun, stdout, dumpMeta bool) bo fd.Close() // #nosec G104 if !keep { if err = os.Remove(chunkPath); err != nil { - ctx.LogE("nncp-reass", append(les, nncp.LE{K: "Chunk", V: chunkNum}), err, "") + ctx.LogE( + "reass-chunk", + append(les, nncp.LE{K: "Chunk", V: chunkNum}), err, + func(les nncp.LEs) string { + return fmt.Sprintf("%s: chunk %d", logMsg(les), chunkNum) + }, + ) hasErrors = true } } @@ -225,15 +244,21 @@ func process(ctx *nncp.Ctx, path string, keep, dryRun, stdout, dumpMeta bool) bo log.Fatalln("Can not close:", err) } } - ctx.LogD("nncp-reass", les, "written") + ctx.LogD("reass-written", les, func(les nncp.LEs) string { + return logMsg(les) + ": written" + }) if !keep { if err = os.Remove(path); err != nil { - ctx.LogE("nncp-reass", les, err, "") + ctx.LogE("reass-removing", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": removing" + }) hasErrors = true } } if stdout { - ctx.LogI("nncp-reass", nncp.LEs{{K: "Path", V: path}}, "done") + ctx.LogI("reass", nncp.LEs{{K: "Path", V: path}}, func(les nncp.LEs) string { + return logMsg(les) + ": done" + }) return !hasErrors } @@ -256,21 +281,26 @@ func process(ctx *nncp.Ctx, path string, keep, dryRun, stdout, dumpMeta bool) bo if err = nncp.DirSync(mainDir); err != nil { log.Fatalln(err) } - ctx.LogI("nncp-reass", nncp.LEs{{K: "Path", V: path}}, "done") + ctx.LogI("reass", nncp.LEs{{K: "Path", V: path}}, func(les nncp.LEs) string { + return logMsg(les) + ": done" + }) return !hasErrors } func findMetas(ctx *nncp.Ctx, dirPath string) []string { dir, err := os.Open(dirPath) defer dir.Close() + logMsg := func(les nncp.LEs) string { + return "Finding .meta in " + dirPath + } if err != nil { - ctx.LogE("nncp-reass", nncp.LEs{{K: "Path", V: dirPath}}, err, "") + ctx.LogE("reass", nncp.LEs{{K: "Path", V: dirPath}}, err, logMsg) return nil } fis, err := dir.Readdir(0) dir.Close() // #nosec G104 if err != nil { - ctx.LogE("nncp-reass", nncp.LEs{{K: "Path", V: dirPath}}, err, "") + ctx.LogE("reass", nncp.LEs{{K: "Path", V: dirPath}}, err, logMsg) return nil } metaPaths := make([]string, 0) diff --git a/src/cmd/nncp-rm/main.go b/src/cmd/nncp-rm/main.go index ce908c7..d8add6e 100644 --- a/src/cmd/nncp-rm/main.go +++ b/src/cmd/nncp-rm/main.go @@ -122,10 +122,14 @@ func main() { return nil } if now.Sub(info.ModTime()) < oldBoundary { - ctx.LogD("nncp-rm", nncp.LEs{{K: "File", V: path}}, "too fresh, skipping") + ctx.LogD("rm-skip", nncp.LEs{{K: "File", V: path}}, func(les nncp.LEs) string { + return fmt.Sprintf("File %s: too fresh, skipping", path) + }) return nil } - ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "") + ctx.LogI("rm", nncp.LEs{{K: "File", V: path}}, func(les nncp.LEs) string { + return fmt.Sprintf("File %s: removed", path) + }) if *dryRun { return nil } @@ -145,7 +149,9 @@ func main() { return nil } if strings.HasSuffix(info.Name(), ".lock") { - ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "") + ctx.LogI("rm", nncp.LEs{{K: "File", V: path}}, func(les nncp.LEs) string { + return fmt.Sprintf("File %s: removed", path) + }) if *dryRun { return nil } @@ -176,22 +182,27 @@ func main() { if info.IsDir() { return nil } + logMsg := func(les nncp.LEs) string { + return fmt.Sprintf("File %s: removed", path) + } if now.Sub(info.ModTime()) < oldBoundary { - ctx.LogD("nncp-rm", nncp.LEs{{K: "File", V: path}}, "too fresh, skipping") + ctx.LogD("rm-skip", nncp.LEs{{K: "File", V: path}}, func(les nncp.LEs) string { + return fmt.Sprintf("File %s: too fresh, skipping", path) + }) return nil } if (*doSeen && strings.HasSuffix(info.Name(), nncp.SeenSuffix)) || (*doNoCK && strings.HasSuffix(info.Name(), nncp.NoCKSuffix)) || (*doHdr && strings.HasSuffix(info.Name(), nncp.HdrSuffix)) || (*doPart && strings.HasSuffix(info.Name(), nncp.PartSuffix)) { - ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "") + ctx.LogI("rm", nncp.LEs{{K: "File", V: path}}, logMsg) if *dryRun { return nil } return os.Remove(path) } if *pktRaw != "" && filepath.Base(info.Name()) == *pktRaw { - ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "") + ctx.LogI("rm", nncp.LEs{{K: "File", V: path}}, logMsg) if *dryRun { return nil } @@ -200,7 +211,7 @@ func main() { if !*doSeen && !*doNoCK && !*doHdr && !*doPart && (*doRx || *doTx) && ((*doRx && xx == nncp.TRx) || (*doTx && xx == nncp.TTx)) { - ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "") + ctx.LogI("rm", nncp.LEs{{K: "File", V: path}}, logMsg) if *dryRun { return nil } diff --git a/src/cmd/nncp-xfer/main.go b/src/cmd/nncp-xfer/main.go index 1dc3ee2..05b3909 100644 --- a/src/cmd/nncp-xfer/main.go +++ b/src/cmd/nncp-xfer/main.go @@ -28,6 +28,7 @@ import ( "os" "path/filepath" + "github.com/dustin/go-humanize" "go.cypherpunks.ru/nncp/v6" ) @@ -105,6 +106,7 @@ func main() { var dir *os.File var fis []os.FileInfo var les nncp.LEs + var logMsg func(les nncp.LEs) string if *txOnly { goto Tx } @@ -112,26 +114,37 @@ func main() { {K: "XX", V: string(nncp.TRx)}, {K: "Dir", V: selfPath}, } - ctx.LogD("nncp-xfer", les, "self") + logMsg = func(les nncp.LEs) string { + return "Packet transfer, received from self" + } + ctx.LogD("xfer-self", les, logMsg) if _, err = os.Stat(selfPath); err != nil { if os.IsNotExist(err) { - ctx.LogD("nncp-xfer", les, "no dir") + ctx.LogD("xfer-self-no-dir", les, func(les nncp.LEs) string { + return logMsg(les) + ": no directory" + }) goto Tx } - ctx.LogE("nncp-xfer", les, err, "stat") + ctx.LogE("xfer-self-stat", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": stating" + }) isBad = true goto Tx } dir, err = os.Open(selfPath) if err != nil { - ctx.LogE("nncp-xfer", les, err, "open") + ctx.LogE("xfer-self-open", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": opening" + }) isBad = true goto Tx } fis, err = dir.Readdir(0) dir.Close() // #nosec G104 if err != nil { - ctx.LogE("nncp-xfer", les, err, "read") + ctx.LogE("xfer-self-read", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": reading" + }) isBad = true goto Tx } @@ -141,28 +154,41 @@ func main() { } nodeId, err := nncp.NodeIdFromString(fi.Name()) les := append(les, nncp.LE{K: "Node", V: fi.Name()}) + logMsg := func(les nncp.LEs) string { + return "Packet transfer, received from " + ctx.NodeName(nodeId) + } if err != nil { - ctx.LogD("nncp-xfer", les, "is not NodeId") + ctx.LogD("xfer-rx-not-node", les, func(les nncp.LEs) string { + return logMsg(les) + ": is not NodeId" + }) continue } if nodeOnly != nil && *nodeId != *nodeOnly.Id { - ctx.LogD("nncp-xfer", les, "skip") + ctx.LogD("xfer-rx-skip", les, func(les nncp.LEs) string { + return logMsg(les) + ": skipping" + }) continue } if _, known := ctx.Neigh[*nodeId]; !known { - ctx.LogD("nncp-xfer", les, "unknown") + ctx.LogD("xfer-rx-unknown", les, func(les nncp.LEs) string { + return logMsg(les) + ": unknown" + }) continue } dir, err = os.Open(filepath.Join(selfPath, fi.Name())) if err != nil { - ctx.LogE("nncp-xfer", les, err, "open") + ctx.LogE("xfer-rx-open", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": opening" + }) isBad = true continue } fisInt, err := dir.Readdir(0) dir.Close() // #nosec G104 if err != nil { - ctx.LogE("nncp-xfer", les, err, "read") + ctx.LogE("xfer-rx-read", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": reading" + }) isBad = true continue } @@ -176,26 +202,45 @@ func main() { } filename := filepath.Join(dir.Name(), fiInt.Name()) les := append(les, nncp.LE{K: "File", V: filename}) + logMsg := func(les nncp.LEs) string { + return fmt.Sprintf( + "Packet transfer, received from %s: %s", + ctx.NodeName(nodeId), filename, + ) + } fd, err := os.Open(filename) if err != nil { - ctx.LogE("nncp-xfer", les, err, "open") + ctx.LogE("xfer-rx-open", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": opening" + }) isBad = true continue } pktEnc, pktEncRaw, err := ctx.HdrRead(fd) if err != nil || pktEnc.Magic != nncp.MagicNNCPEv4 { - ctx.LogD("nncp-xfer", les, "is not a packet") + ctx.LogD("xfer-rx-not-packet", les, func(les nncp.LEs) string { + return logMsg(les) + ": is not a packet" + }) fd.Close() // #nosec G104 continue } if pktEnc.Nice > nice { - ctx.LogD("nncp-xfer", les, "too nice") + ctx.LogD("xfer-rx-too-nice", les, func(les nncp.LEs) string { + return logMsg(les) + ": too nice" + }) fd.Close() // #nosec G104 continue } les = append(les, nncp.LE{K: "Size", V: fiInt.Size()}) + logMsg = func(les nncp.LEs) string { + return fmt.Sprintf( + "Packet transfer, received from %s: %s (%s)", + ctx.NodeName(nodeId), filename, + humanize.IBytes(uint64(fiInt.Size())), + ) + } if !ctx.IsEnoughSpace(fiInt.Size()) { - ctx.LogE("nncp-xfer", les, errors.New("is not enough space"), "") + ctx.LogE("xfer-rx", les, errors.New("is not enough space"), logMsg) fd.Close() // #nosec G104 continue } @@ -213,19 +258,20 @@ func main() { err = w.Close() } if err != nil { - ctx.LogE("nncp-xfer", les, err, "copy") + ctx.LogE("xfer-rx", les, err, logMsg) w.CloseWithError(err) // #nosec G104 } }() if _, err = nncp.CopyProgressed( tmp.W, r, "Rx", - append(les, nncp.LEs{ - {K: "Pkt", V: filename}, - {K: "FullSize", V: fiInt.Size()}, - }...), + append( + les, + nncp.LE{K: "Pkt", V: filename}, + nncp.LE{K: "FullSize", V: fiInt.Size()}, + ), ctx.ShowPrgrs, ); err != nil { - ctx.LogE("nncp-xfer", les, err, "copy") + ctx.LogE("xfer-rx", les, err, logMsg) isBad = true } fd.Close() // #nosec G104 @@ -240,10 +286,10 @@ func main() { )); err != nil { log.Fatalln(err) } - ctx.LogI("nncp-xfer", les, "") + ctx.LogI("xfer-rx", les, logMsg) if !*keep { if err = os.Remove(filename); err != nil { - ctx.LogE("nncp-xfer", les, err, "remove") + ctx.LogE("xfer-rx-remove", les, err, logMsg) isBad = true } } @@ -266,12 +312,14 @@ Tx: return } for nodeId := range ctx.Neigh { - les := nncp.LEs{ - {K: "XX", V: string(nncp.TTx)}, - {K: "Node", V: nodeId}, + les := nncp.LEs{{K: "XX", V: string(nncp.TTx)}, {K: "Node", V: nodeId}} + logMsg := func(les nncp.LEs) string { + return "Packet transfer, sent to " + ctx.NodeName(&nodeId) } if nodeOnly != nil && nodeId != *nodeOnly.Id { - ctx.LogD("nncp-xfer", les, "skip") + ctx.LogD("xfer-tx-skip", les, func(les nncp.LEs) string { + return logMsg(les) + ": skipping" + }) continue } dirLock, err := ctx.LockDir(&nodeId, string(nncp.TTx)) @@ -280,41 +328,55 @@ Tx: } nodePath := filepath.Join(flag.Arg(0), nodeId.String()) les = append(les, nncp.LE{K: "Dir", V: nodePath}) + logMsg = func(les nncp.LEs) string { + return fmt.Sprintf( + "Packet transfer, sent to %s: directory %s", + ctx.NodeName(&nodeId), nodePath, + ) + } _, err = os.Stat(nodePath) if err != nil { if os.IsNotExist(err) { - ctx.LogD("nncp-xfer", les, "does not exist") + ctx.LogD("xfer-tx-not-exist", les, func(les nncp.LEs) string { + return logMsg(les) + ": does not exist" + }) if !*mkdir { ctx.UnlockDir(dirLock) continue } if err = os.Mkdir(nodePath, os.FileMode(0777)); err != nil { ctx.UnlockDir(dirLock) - ctx.LogE("nncp-xfer", les, err, "mkdir") + ctx.LogE("xfer-tx-mkdir", les, err, logMsg) isBad = true continue } } else { ctx.UnlockDir(dirLock) - ctx.LogE("nncp-xfer", les, err, "stat") + ctx.LogE("xfer-tx", les, err, logMsg) isBad = true continue } } dstPath := filepath.Join(nodePath, ctx.SelfId.String()) les[len(les)-1].V = dstPath + logMsg = func(les nncp.LEs) string { + return fmt.Sprintf( + "Packet transfer, sent to %s: directory %s", + ctx.NodeName(&nodeId), dstPath, + ) + } _, err = os.Stat(dstPath) if err != nil { if os.IsNotExist(err) { if err = os.Mkdir(dstPath, os.FileMode(0777)); err != nil { ctx.UnlockDir(dirLock) - ctx.LogE("nncp-xfer", les, err, "mkdir") + ctx.LogE("xfer-tx-mkdir", les, err, logMsg) isBad = true continue } } else { ctx.UnlockDir(dirLock) - ctx.LogE("nncp-xfer", les, err, "stat") + ctx.LogE("xfer-tx", les, err, logMsg) isBad = true continue } @@ -323,29 +385,47 @@ Tx: for job := range ctx.Jobs(&nodeId, nncp.TTx) { pktName := filepath.Base(job.Path) les := append(les, nncp.LE{K: "Pkt", V: pktName}) + logMsg = func(les nncp.LEs) string { + return fmt.Sprintf( + "Packet transfer, sent to %s: %s", + ctx.NodeName(&nodeId), pktName, + ) + } if job.PktEnc.Nice > nice { - ctx.LogD("nncp-xfer", les, "too nice") + ctx.LogD("xfer-tx-too-nice", les, func(les nncp.LEs) string { + return logMsg(les) + ": too nice" + }) continue } if _, err = os.Stat(filepath.Join(dstPath, pktName)); err == nil || !os.IsNotExist(err) { - ctx.LogD("nncp-xfer", les, "already exists") + ctx.LogD("xfer-tx-exists", les, func(les nncp.LEs) string { + return logMsg(les) + ": already exists" + }) continue } if _, err = os.Stat(filepath.Join(dstPath, pktName+nncp.SeenSuffix)); err == nil || !os.IsNotExist(err) { - ctx.LogD("nncp-xfer", les, "already exists") + ctx.LogD("xfer-tx-seen", les, func(les nncp.LEs) string { + return logMsg(les) + ": already seen" + }) continue } tmp, err := nncp.TempFile(dstPath, "xfer") if err != nil { - ctx.LogE("nncp-xfer", les, err, "mktemp") + ctx.LogE("xfer-tx-mktemp", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": mktemp" + }) isBad = true break } les = append(les, nncp.LE{K: "Tmp", V: tmp.Name()}) - ctx.LogD("nncp-xfer", les, "created") + ctx.LogD("xfer-tx-tmp-create", les, func(les nncp.LEs) string { + return fmt.Sprintf("%s: temporary %s created", logMsg(les), tmp.Name()) + }) fd, err := os.Open(job.Path) if err != nil { - ctx.LogE("nncp-xfer", les, err, "open") + ctx.LogE("xfer-tx-open", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": opening" + }) tmp.Close() // #nosec G104 isBad = true continue @@ -358,42 +438,64 @@ Tx: ) fd.Close() // #nosec G104 if err != nil { - ctx.LogE("nncp-xfer", les, err, "copy") + ctx.LogE("xfer-tx-copy", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": copying" + }) tmp.Close() // #nosec G104 isBad = true continue } if err = bufW.Flush(); err != nil { tmp.Close() // #nosec G104 - ctx.LogE("nncp-xfer", les, err, "flush") + ctx.LogE("xfer-tx-flush", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": flushing" + }) isBad = true continue } if err = tmp.Sync(); err != nil { tmp.Close() // #nosec G104 - ctx.LogE("nncp-xfer", les, err, "sync") + ctx.LogE("xfer-tx-sync", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": syncing" + }) isBad = true continue } if err = tmp.Close(); err != nil { - ctx.LogE("nncp-xfer", les, err, "sync") + ctx.LogE("xfer-tx-close", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": closing" + }) } if err = os.Rename(tmp.Name(), filepath.Join(dstPath, pktName)); err != nil { - ctx.LogE("nncp-xfer", les, err, "rename") + ctx.LogE("xfer-tx-rename", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": renaming" + }) isBad = true continue } if err = nncp.DirSync(dstPath); err != nil { - ctx.LogE("nncp-xfer", les, err, "sync") + ctx.LogE("xfer-tx-dirsync", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": dirsyncing" + }) isBad = true continue } os.Remove(filepath.Join(dstPath, pktName+".part")) // #nosec G104 les = les[:len(les)-1] - ctx.LogI("nncp-xfer", append(les, nncp.LE{K: "Size", V: copied}), "") + ctx.LogI( + "xfer-tx", + append(les, nncp.LE{K: "Size", V: copied}), + func(les nncp.LEs) string { + return fmt.Sprintf( + "%s (%s)", logMsg(les), humanize.IBytes(uint64(copied)), + ) + }, + ) if !*keep { if err = os.Remove(job.Path); err != nil { - ctx.LogE("nncp-xfer", les, err, "remove") + ctx.LogE("xfer-tx-remove", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": removing" + }) isBad = true } else if ctx.HdrUsage { os.Remove(job.Path + nncp.HdrSuffix) diff --git a/src/ctx.go b/src/ctx.go index d838828..88fd852 100644 --- a/src/ctx.go +++ b/src/ctx.go @@ -19,6 +19,7 @@ package nncp import ( "errors" + "fmt" "io/ioutil" "log" "os" @@ -65,13 +66,16 @@ func (ctx *Ctx) FindNode(id string) (*Node, error) { func (ctx *Ctx) ensureRxDir(nodeId *NodeId) error { dirPath := filepath.Join(ctx.Spool, nodeId.String(), string(TRx)) + logMsg := func(les LEs) string { + return fmt.Sprintf("Ensuring directory %s existence", dirPath) + } if err := os.MkdirAll(dirPath, os.FileMode(0777)); err != nil { - ctx.LogE("dir-ensure", LEs{{"Dir", dirPath}}, err, "") + ctx.LogE("dir-ensure-mkdir", LEs{{"Dir", dirPath}}, err, logMsg) return err } fd, err := os.Open(dirPath) if err != nil { - ctx.LogE("dir-ensure", LEs{{"Dir", dirPath}}, err, "") + ctx.LogE("dir-ensure-open", LEs{{"Dir", dirPath}}, err, logMsg) return err } return fd.Close() diff --git a/src/humanizer.go b/src/humanizer.go index a4e0a27..69d1bb5 100644 --- a/src/humanizer.go +++ b/src/humanizer.go @@ -18,16 +18,22 @@ along with this program. If not, see . package nncp import ( - "errors" "fmt" - "strconv" "strings" "time" - "github.com/dustin/go-humanize" "go.cypherpunks.ru/recfile" ) +func (ctx *Ctx) NodeName(id *NodeId) string { + idS := id.String() + node, err := ctx.FindNode(idS) + if err == nil { + return node.Name + } + return idS +} + func (ctx *Ctx) HumanizeRec(rec string) string { r := recfile.NewReader(strings.NewReader(rec)) le, err := r.NextMap() @@ -42,265 +48,15 @@ func (ctx *Ctx) HumanizeRec(rec string) string { } func (ctx *Ctx) Humanize(le map[string]string) (string, error) { - nodeS := le["Node"] - node, err := ctx.FindNode(nodeS) - if err == nil { - nodeS = node.Name - } - var sizeParsed uint64 - var size string - if sizeRaw, exists := le["Size"]; exists { - sizeParsed, err = strconv.ParseUint(sizeRaw, 10, 64) - if err != nil { - return "", err - } - size = humanize.IBytes(sizeParsed) - } - - var msg string - switch le["Who"] { - case "tx": - switch le["Type"] { - case "file": - msg = fmt.Sprintf( - "File %s (%s) transfer to %s:%s: %s", - le["Src"], size, nodeS, le["Dst"], le["Msg"], - ) - case "freq": - msg = fmt.Sprintf( - "File request from %s:%s to %s: %s", - nodeS, le["Src"], le["Dst"], le["Msg"], - ) - case "exec": - msg = fmt.Sprintf( - "Exec to %s@%s (%s): %s", - nodeS, le["Dst"], size, le["Msg"], - ) - case "trns": - msg = fmt.Sprintf( - "Transitional packet to %s (%s) (nice %s): %s", - nodeS, size, le["Nice"], le["Msg"], - ) - default: - return "", errors.New("unknown \"tx\" type") - } - if err, exists := le["Err"]; exists { - msg += ": " + err - } - case "rx": - switch le["Type"] { - case "exec": - msg = fmt.Sprintf("Got exec from %s to %s (%s)", nodeS, le["Dst"], size) - case "file": - msg = fmt.Sprintf("Got file %s (%s) from %s", le["Dst"], size, nodeS) - case "freq": - msg = fmt.Sprintf("Got file request %s to %s", le["Src"], nodeS) - case "trns": - nodeT := le["Dst"] - node, err := ctx.FindNode(nodeT) - if err == nil { - nodeT = node.Name - } - msg = fmt.Sprintf( - "Got transitional packet from %s to %s (%s)", - nodeS, nodeT, size, - ) - default: - return "", errors.New("unknown \"rx\" type") - } - if err, exists := le["Err"]; exists { - msg += ": " + err - } - case "check": - msg = fmt.Sprintf("Checking: %s/%s/%s", le["Node"], le["XX"], le["Pkt"]) - if err, exists := le["Err"]; exists { - msg += fmt.Sprintf(" %s", err) - } - case "nncp-xfer": - switch le["XX"] { - case "rx": - msg = "Packet transfer, received from" - case "tx": - msg = "Packet transfer, sent to" - default: - return "", errors.New("unknown XX") - } - if nodeS != "" { - msg += " node " + nodeS - } - if size != "" { - msg += fmt.Sprintf(" (%s)", size) - } - if err, exists := le["Err"]; exists { - msg += ": " + err - } else { - msg += " " + le["Msg"] - } - case "nncp-bundle": - switch le["XX"] { - case "rx": - msg = "Bundle transfer, received from" - case "tx": - msg = "Bundle transfer, sent to" - default: - return "", errors.New("unknown XX") - } - if nodeS != "" { - msg += " node " + nodeS - } - msg += " " + le["Pkt"] - if size != "" { - msg += fmt.Sprintf(" (%s)", size) - } - if err, exists := le["Err"]; exists { - msg += ": " + err - } - case "nncp-rm": - msg += "removing " + le["File"] - case "call-start": - msg = fmt.Sprintf("Connection to %s", nodeS) - if err, exists := le["Err"]; exists { - msg += ": " + err - } - case "call-finish": - rx, err := strconv.ParseUint(le["RxBytes"], 10, 64) - if err != nil { - return "", err - } - rxs, err := strconv.ParseUint(le["RxSpeed"], 10, 64) - if err != nil { - return "", err - } - tx, err := strconv.ParseUint(le["TxBytes"], 10, 64) - if err != nil { - return "", err - } - txs, err := strconv.ParseUint(le["TxSpeed"], 10, 64) - if err != nil { - return "", err - } - msg = fmt.Sprintf( - "Finished call with %s: %s received (%s/sec), %s transferred (%s/sec)", - nodeS, - humanize.IBytes(uint64(rx)), humanize.IBytes(uint64(rxs)), - humanize.IBytes(uint64(tx)), humanize.IBytes(uint64(txs)), - ) - case "sp-start": - if nodeS == "" { - msg += "SP" - if peer, exists := le["Peer"]; exists { - msg += fmt.Sprintf(": %s", peer) - } - } else { - nice, err := NicenessParse(le["Nice"]) - if err != nil { - return "", err - } - msg += fmt.Sprintf("SP with %s (nice %s)", nodeS, NicenessFmt(nice)) - } - if m, exists := le["Msg"]; exists { - msg += ": " + m - } - if err, exists := le["Err"]; exists { - msg += ": " + err - } - case "sp-info": - nice, err := NicenessParse(le["Nice"]) - if err != nil { - return "", err - } - msg = fmt.Sprintf( - "Packet %s (%s) (nice %s)", - le["Pkt"], size, NicenessFmt(nice), - ) - if offset := le["Offset"]; offset != "" { - offsetParsed, err := strconv.ParseUint(offset, 10, 64) - if err != nil { - return "", err - } - msg += fmt.Sprintf(": %d%%", 100*offsetParsed/sizeParsed) - } - if m, exists := le["Msg"]; exists { - msg += ": " + m - } - case "sp-infos": - switch le["XX"] { - case "rx": - msg = fmt.Sprintf("%s has got for us: ", nodeS) - case "tx": - msg = fmt.Sprintf("We have got for %s: ", nodeS) - default: - return "", errors.New("unknown XX") - } - msg += fmt.Sprintf("%s packets, %s", le["Pkts"], size) - case "sp-process": - msg = fmt.Sprintf("%s has %s (%s): %s", nodeS, le["Pkt"], size, le["Msg"]) - case "sp-file": - switch le["XX"] { - case "rx": - msg = "Got packet " - case "tx": - msg = "Sent packet " - default: - return "", errors.New("unknown XX") - } - fullsize := uint64(1) - if raw, exists := le["FullSize"]; exists { - fullsize, err = strconv.ParseUint(raw, 10, 64) - if err != nil { - return "", err - } - } - msg += fmt.Sprintf( - "%s %d%% (%s / %s)", - le["Pkt"], - 100*sizeParsed/fullsize, - humanize.IBytes(uint64(sizeParsed)), - humanize.IBytes(uint64(fullsize)), - ) - if m, exists := le["Msg"]; exists { - msg += ": " + m - } - if err, exists := le["Err"]; exists { - msg += ": " + err - } - case "sp-done": - switch le["XX"] { - case "rx": - msg = fmt.Sprintf("Packet %s is retreived (%s)", le["Pkt"], size) - case "tx": - msg = fmt.Sprintf("Packet %s is sent", le["Pkt"]) - default: - return "", errors.New("unknown XX") - } - case "nncp-reass": - chunkNum, exists := le["Chunk"] - if exists { - msg = fmt.Sprintf( - "Reassembling chunked file \"%s\" (chunk %s): %s", - le["Path"], chunkNum, le["Msg"], - ) - } else { - msg = fmt.Sprintf( - "Reassembling chunked file \"%s\": %s", - le["Path"], le["Msg"], - ) - } - if err, exists := le["Err"]; exists { - msg += ": " + err - } - case "lockdir": - msg = fmt.Sprintf("Acquire lock for %s: %s", le["Path"], le["Err"]) - default: - return "", errors.New("unknown Who") - } when, err := time.Parse(time.RFC3339Nano, le["When"]) if err != nil { return "", err } var level string - if _, isErr := le["Err"]; isErr { + msg := le["Msg"] + if errMsg, isErr := le["Err"]; isErr { level = "ERROR " + msg += ": " + errMsg } return fmt.Sprintf("%s %s%s", when.Format(time.RFC3339), level, msg), nil } diff --git a/src/jobs.go b/src/jobs.go index 37a31b1..1b5cef8 100644 --- a/src/jobs.go +++ b/src/jobs.go @@ -19,11 +19,13 @@ package nncp import ( "bytes" + "fmt" "os" "path/filepath" "strings" xdr "github.com/davecgh/go-xdr/xdr2" + "github.com/dustin/go-humanize" ) type TRxTx string @@ -58,21 +60,29 @@ func (ctx *Ctx) HdrRead(fd *os.File) (*PktEnc, []byte, error) { func (ctx *Ctx) HdrWrite(pktEncRaw []byte, tgt string) error { tmpHdr, err := ctx.NewTmpFile() if err != nil { - ctx.LogE("hdr-write", []LE{}, err, "new") + ctx.LogE("hdr-write-tmp-new", nil, err, func(les LEs) string { + return "Header writing: new temporary file" + }) return err } if _, err = tmpHdr.Write(pktEncRaw); err != nil { - ctx.LogE("hdr-write", []LE{}, err, "write") + ctx.LogE("hdr-write-write", nil, err, func(les LEs) string { + return "Header writing: writing" + }) os.Remove(tmpHdr.Name()) return err } if err = tmpHdr.Close(); err != nil { - ctx.LogE("hdr-write", []LE{}, err, "close") + ctx.LogE("hdr-write-close", nil, err, func(les LEs) string { + return "Header writing: closing" + }) os.Remove(tmpHdr.Name()) return err } if err = os.Rename(tmpHdr.Name(), tgt+HdrSuffix); err != nil { - ctx.LogE("hdr-write", []LE{}, err, "rename") + ctx.LogE("hdr-write-rename", nil, err, func(les LEs) string { + return "Header writing: renaming" + }) return err } return err @@ -132,13 +142,20 @@ func (ctx *Ctx) jobsFind(nodeId *NodeId, xx TRxTx, nock bool) chan Job { if err != nil || pktEnc.Magic != MagicNNCPEv4 { continue } - ctx.LogD("jobs", LEs{ + ctx.LogD("job", LEs{ {"XX", string(xx)}, {"Node", pktEnc.Sender}, {"Name", name}, {"Nice", int(pktEnc.Nice)}, {"Size", fi.Size()}, - }, "taken") + }, func(les LEs) string { + return fmt.Sprintf( + "Job %s/%s/%s nice: %s size: %s", + pktEnc.Sender, string(xx), name, + NicenessFmt(pktEnc.Nice), + humanize.IBytes(uint64(fi.Size())), + ) + }) if !hdrExists && ctx.HdrUsage { ctx.HdrWrite(pktEncRaw, pth) } diff --git a/src/lockdir.go b/src/lockdir.go index 2e6ac20..2219d1b 100644 --- a/src/lockdir.go +++ b/src/lockdir.go @@ -26,7 +26,6 @@ import ( func (ctx *Ctx) LockDir(nodeId *NodeId, lockCtx string) (*os.File, error) { if err := ctx.ensureRxDir(nodeId); err != nil { - ctx.LogE("lockdir", LEs{}, err, "") return nil, err } lockPath := filepath.Join(ctx.Spool, nodeId.String(), lockCtx) + ".lock" @@ -36,12 +35,16 @@ func (ctx *Ctx) LockDir(nodeId *NodeId, lockCtx string) (*os.File, error) { os.FileMode(0666), ) if err != nil { - ctx.LogE("lockdir", LEs{{"path", lockPath}}, err, "") + ctx.LogE("lockdir-open", LEs{{"Path", lockPath}}, err, func(les LEs) string { + return "Locking directory: opening %s" + lockPath + }) return nil, err } err = unix.Flock(int(dirLock.Fd()), unix.LOCK_EX|unix.LOCK_NB) if err != nil { - ctx.LogE("lockdir", LEs{{"path", lockPath}}, err, "") + ctx.LogE("lockdir-flock", LEs{{"Path", lockPath}}, err, func(les LEs) string { + return "Locking directory: locking %s" + lockPath + }) dirLock.Close() // #nosec G104 return nil, err } diff --git a/src/log.go b/src/log.go index 25a4946..7bb59a3 100644 --- a/src/log.go +++ b/src/log.go @@ -94,22 +94,18 @@ func (ctx *Ctx) Log(rec string) { fd.Close() // #nosec G104 } -func (ctx *Ctx) LogD(who string, les LEs, msg string) { +func (ctx *Ctx) LogD(who string, les LEs, msg func(LEs) string) { if !ctx.Debug { return } les = append(LEs{{"Debug", true}, {"Who", who}}, les...) - if msg != "" { - les = append(les, LE{"Msg", msg}) - } + les = append(les, LE{"Msg", msg(les)}) fmt.Fprint(os.Stderr, les.Rec()) } -func (ctx *Ctx) LogI(who string, les LEs, msg string) { +func (ctx *Ctx) LogI(who string, les LEs, msg func(LEs) string) { les = append(LEs{{"Who", who}}, les...) - if msg != "" { - les = append(les, LE{"Msg", msg}) - } + les = append(les, LE{"Msg", msg(les)}) rec := les.Rec() if !ctx.Quiet { fmt.Fprintln(os.Stderr, ctx.HumanizeRec(rec)) @@ -117,11 +113,9 @@ func (ctx *Ctx) LogI(who string, les LEs, msg string) { ctx.Log(rec) } -func (ctx *Ctx) LogE(who string, les LEs, err error, msg string) { +func (ctx *Ctx) LogE(who string, les LEs, err error, msg func(LEs) string) { les = append(LEs{{"Err", err.Error()}, {"Who", who}}, les...) - if msg != "" { - les = append(les, LE{"Msg", msg}) - } + les = append(les, LE{"Msg", msg(les)}) rec := les.Rec() if !ctx.Quiet { fmt.Fprintln(os.Stderr, ctx.HumanizeRec(rec)) diff --git a/src/nice.go b/src/nice.go index eceb6fc..7682a79 100644 --- a/src/nice.go +++ b/src/nice.go @@ -106,3 +106,17 @@ func NicenessFmt(nice uint8) string { } return strconv.Itoa(int(nice)) } + +type ByNice []*SPInfo + +func (a ByNice) Len() int { + return len(a) +} + +func (a ByNice) Swap(i, j int) { + a[i], a[j] = a[j], a[i] +} + +func (a ByNice) Less(i, j int) bool { + return a[i].Nice < a[j].Nice +} diff --git a/src/nncp.go b/src/nncp.go index 16e5b9a..b6f6d1a 100644 --- a/src/nncp.go +++ b/src/nncp.go @@ -40,7 +40,7 @@ along with this program. If not, see .` const Base32Encoded32Len = 52 var ( - Version string = "6.1.0" + Version string = "6.2.0" Base32Codec *base32.Encoding = base32.StdEncoding.WithPadding(base32.NoPadding) ) diff --git a/src/sortbynice.go b/src/sortbynice.go deleted file mode 100644 index 1e02796..0000000 --- a/src/sortbynice.go +++ /dev/null @@ -1,15 +0,0 @@ -package nncp - -type ByNice []*SPInfo - -func (a ByNice) Len() int { - return len(a) -} - -func (a ByNice) Swap(i, j int) { - a[i], a[j] = a[j], a[i] -} - -func (a ByNice) Less(i, j int) bool { - return a[i].Nice < a[j].Nice -} diff --git a/src/sp.go b/src/sp.go index 6f1af9f..f2c707f 100644 --- a/src/sp.go +++ b/src/sp.go @@ -21,6 +21,7 @@ import ( "bytes" "crypto/subtle" "errors" + "fmt" "hash" "io" "os" @@ -30,6 +31,7 @@ import ( "time" xdr "github.com/davecgh/go-xdr/xdr2" + "github.com/dustin/go-humanize" "github.com/flynn/noise" "golang.org/x/crypto/blake2b" ) @@ -277,19 +279,32 @@ func (state *SPState) dirUnlock() { func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) { for hshValue := range appeared { + pktName := Base32Codec.EncodeToString(hshValue[:]) les := LEs{ {"XX", string(TRx)}, {"Node", nodeId}, - {"Pkt", Base32Codec.EncodeToString(hshValue[:])}, + {"Pkt", pktName}, } - ctx.LogD("sp-checker", les, "checking") + ctx.LogD("sp-checker", les, func(les LEs) string { + return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(nodeId), pktName) + }) size, err := ctx.CheckNoCK(nodeId, hshValue) les = append(les, LE{"Size", size}) if err != nil { - ctx.LogE("sp-checker", les, err, "") + ctx.LogE("sp-checker", les, err, func(les LEs) string { + return fmt.Sprintf( + "Checksumming %s/rx/%s (%s)", ctx.NodeName(nodeId), pktName, + humanize.IBytes(uint64(size)), + ) + }) continue } - ctx.LogI("sp-done", les, "") + ctx.LogI("sp-checker-done", les, func(les LEs) string { + return fmt.Sprintf( + "Packet %s is retreived (%s)", + pktName, humanize.IBytes(uint64(size)), + ) + }) go func(hsh *[32]byte) { checked <- hsh }(hshValue) } } @@ -353,19 +368,34 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [ var payloads [][]byte for _, info := range infos { payloads = append(payloads, MarshalSP(SPTypeInfo, info)) + pktName := Base32Codec.EncodeToString(info.Hash[:]) ctx.LogD("sp-info-our", LEs{ {"Node", nodeId}, - {"Name", Base32Codec.EncodeToString(info.Hash[:])}, + {"Name", pktName}, {"Size", info.Size}, - }, "") + }, func(les LEs) string { + return fmt.Sprintf( + "Our info: %s/tx/%s (%s)", + ctx.NodeName(nodeId), + pktName, + humanize.IBytes(info.Size), + ) + }) } if totalSize > 0 { - ctx.LogI("sp-infos", LEs{ + ctx.LogI("sp-infos-tx", LEs{ {"XX", string(TTx)}, {"Node", nodeId}, {"Pkts", len(payloads)}, {"Size", totalSize}, - }, "") + }, func(les LEs) string { + return fmt.Sprintf( + "We have got for %s: %d packets, %s", + ctx.NodeName(nodeId), + len(payloads), + humanize.IBytes(uint64(totalSize)), + ) + }) } return payloadsSplit(payloads) } @@ -435,30 +465,72 @@ func (state *SPState) StartI(conn ConnDeadlined) error { return err } les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}} - state.Ctx.LogD("sp-start", les, "sending first message") + state.Ctx.LogD("sp-startI", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): sending first message", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if err = state.WriteSP(conn, buf, false); err != nil { - state.Ctx.LogE("sp-start", les, err, "") + state.Ctx.LogE("sp-startI", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): writing", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) state.dirUnlock() return err } - state.Ctx.LogD("sp-start", les, "waiting for first message") + state.Ctx.LogD("sp-startI-wait", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): waiting for first message", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if buf, err = state.ReadSP(conn); err != nil { - state.Ctx.LogE("sp-start", les, err, "") + state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): reading", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) state.dirUnlock() return err } payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf) if err != nil { - state.Ctx.LogE("sp-start", les, err, "") + state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): reading Noise message", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) state.dirUnlock() return err } - state.Ctx.LogD("sp-start", les, "starting workers") + state.Ctx.LogD("sp-startI-workers", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): starting workers", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) err = state.StartWorkers(conn, infosPayloads, payload) if err != nil { - state.Ctx.LogE("sp-start", les, err, "") + state.Ctx.LogE("sp-startI-workers", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): starting workers", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) state.dirUnlock() } return err @@ -490,14 +562,21 @@ func (state *SPState) StartR(conn ConnDeadlined) error { var buf []byte var payload []byte - state.Ctx.LogD("sp-start", LEs{{"Nice", int(state.Nice)}}, "waiting for first message") + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP nice %s: waiting for first message", + NicenessFmt(state.Nice), + ) + } + les := LEs{{"Nice", int(state.Nice)}} + state.Ctx.LogD("sp-startR", les, logMsg) conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if buf, err = state.ReadSP(conn); err != nil { - state.Ctx.LogE("sp-start", LEs{}, err, "") + state.Ctx.LogE("sp-startR-read", les, err, logMsg) return err } if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil { - state.Ctx.LogE("sp-start", LEs{}, err, "") + state.Ctx.LogE("sp-startR-read", les, err, logMsg) return err } @@ -510,15 +589,16 @@ func (state *SPState) StartR(conn ConnDeadlined) error { } if node == nil { peerId := Base32Codec.EncodeToString(state.hs.PeerStatic()) - state.Ctx.LogE("sp-start", LEs{{"Peer", peerId}}, errors.New("unknown peer"), "") - return errors.New("Unknown peer: " + peerId) + err = errors.New("unknown peer: " + peerId) + state.Ctx.LogE("sp-startR-unknown", append(les, LE{"Peer", peerId}), err, logMsg) + return err } state.Node = node state.rxRate = node.RxRate state.txRate = node.TxRate state.onlineDeadline = node.OnlineDeadline state.maxOnlineTime = node.MaxOnlineTime - les := LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}} + les = LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}} if err = state.Ctx.ensureRxDir(node.Id); err != nil { return err @@ -553,7 +633,12 @@ func (state *SPState) StartR(conn ConnDeadlined) error { firstPayload = append(firstPayload, SPHaltMarshalized...) } - state.Ctx.LogD("sp-start", les, "sending first message") + state.Ctx.LogD("sp-startR-write", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): sending first message", + node.Name, NicenessFmt(state.Nice), + ) + }) buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload) if err != nil { state.dirUnlock() @@ -561,11 +646,21 @@ func (state *SPState) StartR(conn ConnDeadlined) error { } conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if err = state.WriteSP(conn, buf, false); err != nil { - state.Ctx.LogE("sp-start", les, err, "") + state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): writing", + node.Name, NicenessFmt(state.Nice), + ) + }) state.dirUnlock() return err } - state.Ctx.LogD("sp-start", les, "starting workers") + state.Ctx.LogD("sp-startR-workers", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): starting workers", + node.Name, NicenessFmt(state.Nice), + ) + }) err = state.StartWorkers(conn, infosPayloads, payload) if err != nil { state.dirUnlock() @@ -633,9 +728,15 @@ func (state *SPState) StartWorkers( go func() { for _, payload := range infosPayloads[1:] { state.Ctx.LogD( - "sp-work", + "sp-queue-remaining", append(les, LE{"Size", len(payload)}), - "queuing remaining payload", + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): queuing remaining payload (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + }, ) state.payloads <- payload } @@ -644,23 +745,32 @@ func (state *SPState) StartWorkers( } // Processing of first payload and queueing its responses - state.Ctx.LogD( - "sp-work", - append(les, LE{"Size", len(payload)}), - "processing first payload", - ) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): processing first payload (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + } + state.Ctx.LogD("sp-process", append(les, LE{"Size", len(payload)}), logMsg) replies, err := state.ProcessSP(payload) if err != nil { - state.Ctx.LogE("sp-work", les, err, "") + state.Ctx.LogE("sp-process", les, err, logMsg) return err } state.wg.Add(1) go func() { for _, reply := range replies { state.Ctx.LogD( - "sp-work", + "sp-queue-reply", append(les, LE{"Size", len(reply)}), - "queuing reply", + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): queuing reply (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + }, ) state.payloads <- reply } @@ -717,9 +827,15 @@ func (state *SPState) StartWorkers( &state.infosOurSeen, ) { state.Ctx.LogD( - "sp-work", + "sp-queue-info", append(les, LE{"Size", len(payload)}), - "queuing new info", + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): queuing new info (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + }, ) state.payloads <- payload } @@ -742,14 +858,25 @@ func (state *SPState) StartWorkers( var ping bool select { case <-state.pings: - state.Ctx.LogD("sp-xmit", les, "got ping") + state.Ctx.LogD("sp-got-ping", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): got ping", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) payload = SPPingMarshalized ping = true case payload = <-state.payloads: state.Ctx.LogD( - "sp-xmit", + "sp-got-payload", append(les, LE{"Size", len(payload)}), - "got payload", + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): got payload (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + }, ) default: state.RLock() @@ -763,12 +890,24 @@ func (state *SPState) StartWorkers( if state.txRate > 0 { time.Sleep(time.Second / time.Duration(state.txRate)) } - lesp := append(les, LEs{ - {"XX", string(TTx)}, - {"Pkt", Base32Codec.EncodeToString(freq.Hash[:])}, - {"Size", int64(freq.Offset)}, - }...) - state.Ctx.LogD("sp-file", lesp, "queueing") + pktName := Base32Codec.EncodeToString(freq.Hash[:]) + lesp := append( + les, + LE{"XX", string(TTx)}, + LE{"Pkt", pktName}, + LE{"Size", int64(freq.Offset)}, + ) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): tx/%s (%s)", + state.Node.Name, NicenessFmt(state.Nice), + pktName, + humanize.IBytes(freq.Offset), + ) + } + state.Ctx.LogD("sp-queue", lesp, func(les LEs) string { + return logMsg(les) + ": queueing" + }) pth := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), @@ -779,12 +918,16 @@ func (state *SPState) StartWorkers( if !exists { fd, err := os.Open(pth) if err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") + state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string { + return logMsg(les) + ": opening" + }) return } fi, err := fd.Stat() if err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") + state.Ctx.LogE("sp-queue-stat", lesp, err, func(les LEs) string { + return logMsg(les) + ": stating" + }) return } fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()} @@ -794,19 +937,34 @@ func (state *SPState) StartWorkers( fullSize := fdAndFullSize.fullSize var buf []byte if freq.Offset < uint64(fullSize) { - state.Ctx.LogD("sp-file", lesp, "seeking") + state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string { + return logMsg(les) + ": seeking" + }) if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") + state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string { + return logMsg(les) + ": seeking" + }) return } buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead) n, err := fd.Read(buf) if err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") + state.Ctx.LogE("sp-file-read", lesp, err, func(les LEs) string { + return logMsg(les) + ": reading" + }) return } buf = buf[:n] - state.Ctx.LogD("sp-file", append(lesp, LE{"Size", n}), "read") + state.Ctx.LogD( + "sp-file-read", + append(lesp, LE{"Size", n}), + func(les LEs) string { + return fmt.Sprintf( + "%s: read %s", + logMsg(les), humanize.IBytes(uint64(n)), + ) + }, + ) } state.closeFd(pth) payload = MarshalSP(SPTypeFile, SPFile{ @@ -815,15 +973,16 @@ func (state *SPState) StartWorkers( Payload: buf, }) ourSize := freq.Offset + uint64(len(buf)) - lesp = append(lesp, LE{"Size", int64(ourSize)}) - lesp = append(lesp, LE{"FullSize", fullSize}) + lesp = append(lesp, LE{"Size", int64(ourSize)}, LE{"FullSize", fullSize}) if state.Ctx.ShowPrgrs { Progress("Tx", lesp) } state.Lock() if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash { if ourSize == uint64(fullSize) { - state.Ctx.LogD("sp-file", lesp, "finished") + state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string { + return logMsg(les) + ": finished" + }) if len(state.queueTheir) > 1 { state.queueTheir = state.queueTheir[1:] } else { @@ -833,14 +992,23 @@ func (state *SPState) StartWorkers( state.queueTheir[0].freq.Offset += uint64(len(buf)) } } else { - state.Ctx.LogD("sp-file", lesp, "queue disappeared") + state.Ctx.LogD("sp-file-disappeared", lesp, func(les LEs) string { + return logMsg(les) + ": queue disappeared" + }) } state.Unlock() } - state.Ctx.LogD("sp-xmit", append(les, LE{"Size", len(payload)}), "sending") + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): sending %s", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + } + state.Ctx.LogD("sp-sending", append(les, LE{"Size", len(payload)}), logMsg) conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil { - state.Ctx.LogE("sp-xmit", les, err, "") + state.Ctx.LogE("sp-sending", les, err, logMsg) return } } @@ -853,7 +1021,13 @@ func (state *SPState) StartWorkers( if state.NotAlive() { break } - state.Ctx.LogD("sp-recv", les, "waiting for payload") + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): waiting for payload", + state.Node.Name, NicenessFmt(state.Nice), + ) + } + state.Ctx.LogD("sp-recv-wait", les, logMsg) conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 payload, err := state.ReadSP(conn) if err != nil { @@ -867,36 +1041,55 @@ func (state *SPState) StartWorkers( if unmarshalErr.ErrorCode == xdr.ErrIO { break } - state.Ctx.LogE("sp-recv", les, err, "") + state.Ctx.LogE("sp-recv-wait", les, err, logMsg) break } + logMsg = func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): payload (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + } state.Ctx.LogD( - "sp-recv", + "sp-recv-got", append(les, LE{"Size", len(payload)}), - "got payload", + func(les LEs) string { return logMsg(les) + ": got" }, ) payload, err = state.csTheir.Decrypt(nil, nil, payload) if err != nil { - state.Ctx.LogE("sp-recv", les, err, "") + state.Ctx.LogE("sp-recv-got", les, err, func(les LEs) string { + return logMsg(les) + ": got" + }) break } state.Ctx.LogD( - "sp-recv", + "sp-recv-process", append(les, LE{"Size", len(payload)}), - "processing", + func(les LEs) string { + return logMsg(les) + ": processing" + }, ) replies, err := state.ProcessSP(payload) if err != nil { - state.Ctx.LogE("sp-recv", les, err, "") + state.Ctx.LogE("sp-recv-process", les, err, func(les LEs) string { + return logMsg(les) + ": processing" + }) break } state.wg.Add(1) go func() { for _, reply := range replies { state.Ctx.LogD( - "sp-recv", - append(les, LE{"Size", len(reply)}), - "queuing reply", + "sp-recv-reply", + append(les[:len(les)-1], LE{"Size", len(reply)}), + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): queuing reply (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(reply))), + ) + }, ) state.payloads <- reply } @@ -940,10 +1133,20 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { var replies [][]byte var infosGot bool for r.Len() > 0 { - state.Ctx.LogD("sp-process", les, "unmarshaling header") + state.Ctx.LogD("sp-process-unmarshal", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling header", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) var head SPHead if _, err = xdr.Unmarshal(r, &head); err != nil { - state.Ctx.LogE("sp-process", les, err, "") + state.Ctx.LogE("sp-process-unmarshal", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling header", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) return nil, err } if head.Type != SPTypePing { @@ -951,62 +1154,126 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } switch head.Type { case SPTypeHalt: - state.Ctx.LogD("sp-process", append(les, LE{"Type", "halt"}), "") + state.Ctx.LogD( + "sp-process-halt", + append(les, LE{"Type", "halt"}), func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): got HALT", + state.Node.Name, NicenessFmt(state.Nice), + ) + }, + ) state.Lock() state.queueTheir = nil state.Unlock() case SPTypePing: - state.Ctx.LogD("sp-process", append(les, LE{"Type", "ping"}), "") + state.Ctx.LogD( + "sp-process-ping", + append(les, LE{"Type", "ping"}), + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): got PING", + state.Node.Name, NicenessFmt(state.Nice), + ) + }, + ) case SPTypeInfo: infosGot = true lesp := append(les, LE{"Type", "info"}) - state.Ctx.LogD("sp-process", lesp, "unmarshaling packet") + state.Ctx.LogD( + "sp-process-info-unmarshal", lesp, + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling INFO", + state.Node.Name, NicenessFmt(state.Nice), + ) + }, + ) var info SPInfo if _, err = xdr.Unmarshal(r, &info); err != nil { - state.Ctx.LogE("sp-process", lesp, err, "") + state.Ctx.LogE( + "sp-process-info-unmarshal", lesp, err, + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling INFO", + state.Node.Name, NicenessFmt(state.Nice), + ) + }, + ) return nil, err } - lesp = append(lesp, LEs{ - {"Pkt", Base32Codec.EncodeToString(info.Hash[:])}, - {"Size", int64(info.Size)}, - {"Nice", int(info.Nice)}, - }...) + pktName := Base32Codec.EncodeToString(info.Hash[:]) + lesp = append( + lesp, + LE{"Pkt", pktName}, + LE{"Size", int64(info.Size)}, + LE{"PktNice", int(info.Nice)}, + ) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): INFO %s (%s) nice %s", + state.Node.Name, NicenessFmt(state.Nice), + pktName, + humanize.IBytes(info.Size), + NicenessFmt(info.Nice), + ) + } if !state.listOnly && info.Nice > state.Nice { - state.Ctx.LogD("sp-process", lesp, "too nice") + state.Ctx.LogD("sp-process-info-too-nice", lesp, func(les LEs) string { + return logMsg(les) + ": too nice" + }) continue } - state.Ctx.LogD("sp-process", lesp, "received") + state.Ctx.LogD("sp-process-info-got", lesp, func(les LEs) string { + return logMsg(les) + ": received" + }) if !state.listOnly && state.xxOnly == TTx { continue } state.Lock() state.infosTheir[*info.Hash] = &info state.Unlock() - state.Ctx.LogD("sp-process", lesp, "stating part") + state.Ctx.LogD("sp-process-info-stat", lesp, func(les LEs) string { + return logMsg(les) + ": stating part" + }) pktPath := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TRx), Base32Codec.EncodeToString(info.Hash[:]), ) + logMsg = func(les LEs) string { + return fmt.Sprintf( + "Packet %s (%s) (nice %s)", + pktName, + humanize.IBytes(info.Size), + NicenessFmt(info.Nice), + ) + } if _, err = os.Stat(pktPath); err == nil { - state.Ctx.LogI("sp-info", lesp, "already done") + state.Ctx.LogI("sp-info-done", lesp, func(les LEs) string { + return logMsg(les) + ": already done" + }) if !state.listOnly { replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash})) } continue } if _, err = os.Stat(pktPath + SeenSuffix); err == nil { - state.Ctx.LogI("sp-info", lesp, "already seen") + state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string { + return logMsg(les) + ": already seen" + }) if !state.listOnly { replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash})) } continue } if _, err = os.Stat(pktPath + NoCKSuffix); err == nil { - state.Ctx.LogI("sp-info", lesp, "still non checksummed") + state.Ctx.LogI("sp-info-nock", lesp, func(les LEs) string { + return logMsg(les) + ": still not checksummed" + }) continue } fi, err := os.Stat(pktPath + PartSuffix) @@ -1015,10 +1282,20 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { offset = fi.Size() } if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) { - state.Ctx.LogI("sp-info", lesp, "not enough space") + state.Ctx.LogI("sp-info-no-space", lesp, func(les LEs) string { + return logMsg(les) + ": not enough space" + }) continue } - state.Ctx.LogI("sp-info", append(lesp, LE{"Offset", offset}), "") + state.Ctx.LogI( + "sp-info", + append(lesp, LE{"Offset", offset}), + func(les LEs) string { + return fmt.Sprintf( + "%s: %d%%", logMsg(les), 100*uint64(offset)/info.Size, + ) + }, + ) if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) { replies = append(replies, MarshalSP( SPTypeFreq, @@ -1028,25 +1305,45 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { case SPTypeFile: lesp := append(les, LE{"Type", "file"}) - state.Ctx.LogD("sp-process", lesp, "unmarshaling packet") + state.Ctx.LogD("sp-process-file", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling FILE", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) var file SPFile if _, err = xdr.Unmarshal(r, &file); err != nil { - state.Ctx.LogE("sp-process", lesp, err, "") + state.Ctx.LogE("sp-process-file", lesp, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling FILE", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) return nil, err } - lesp = append(lesp, LEs{ - {"XX", string(TRx)}, - {"Pkt", Base32Codec.EncodeToString(file.Hash[:])}, - {"Size", len(file.Payload)}, - }...) + pktName := Base32Codec.EncodeToString(file.Hash[:]) + lesp = append( + lesp, + LE{"XX", string(TRx)}, + LE{"Pkt", pktName}, + LE{"Size", len(file.Payload)}, + ) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "Got packet %s (%s)", + pktName, humanize.IBytes(uint64(len(file.Payload))), + ) + } dirToSync := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TRx), ) - filePath := filepath.Join(dirToSync, Base32Codec.EncodeToString(file.Hash[:])) + filePath := filepath.Join(dirToSync, pktName) filePathPart := filePath + PartSuffix - state.Ctx.LogD("sp-file", lesp, "opening part") + state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string { + return logMsg(les) + ": opening part" + }) fdAndFullSize, exists := state.fds[filePathPart] var fd *os.File if exists { @@ -1058,7 +1355,9 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { os.FileMode(0666), ) if err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") + state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string { + return logMsg(les) + ": opening part" + }) return nil, err } state.fds[filePathPart] = FdAndFullSize{fd: fd} @@ -1070,15 +1369,26 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.fileHashers[filePath] = &HasherAndOffset{h: h} } } - state.Ctx.LogD("sp-file", append(lesp, LE{"Offset", file.Offset}), "seeking") + state.Ctx.LogD( + "sp-file-seek", + append(lesp, LE{"Offset", file.Offset}), + func(les LEs) string { + return fmt.Sprintf("%s: seeking %d", logMsg(les), file.Offset) + }) if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") + state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string { + return logMsg(les) + ": seeking" + }) state.closeFd(filePathPart) return nil, err } - state.Ctx.LogD("sp-file", lesp, "writing") + state.Ctx.LogD("sp-file-write", lesp, func(les LEs) string { + return logMsg(les) + ": writing" + }) if _, err = fd.Write(file.Payload); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") + state.Ctx.LogE("sp-file-write", lesp, err, func(les LEs) string { + return logMsg(les) + ": writing" + }) state.closeFd(filePathPart) return nil, err } @@ -1090,10 +1400,11 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } hasherAndOffset.offset += uint64(len(file.Payload)) } else { - state.Ctx.LogE( - "sp-file", lesp, - errors.New("offset differs"), - "deleting hasher", + state.Ctx.LogD( + "sp-file-offset-differs", lesp, + func(les LEs) string { + return logMsg(les) + ": offset differs, deleting hasher" + }, ) delete(state.fileHashers, filePath) hasherExists = false @@ -1115,26 +1426,46 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { if fullsize != ourSize { continue } + logMsg = func(les LEs) string { + return fmt.Sprintf( + "Got packet %s %d%% (%s / %s)", + pktName, 100*ourSize/fullsize, + humanize.IBytes(uint64(ourSize)), + humanize.IBytes(uint64(fullsize)), + ) + } err = fd.Sync() if err != nil { - state.Ctx.LogE("sp-file", lesp, err, "sync") + state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string { + return logMsg(les) + ": syncing" + }) state.closeFd(filePathPart) continue } if hasherExists { if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 { - state.Ctx.LogE("sp-file", lesp, errors.New("checksum mismatch"), "") + state.Ctx.LogE( + "sp-file-bad-checksum", lesp, + errors.New("checksum mismatch"), + logMsg, + ) continue } if err = os.Rename(filePathPart, filePath); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "rename") + state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string { + return logMsg(les) + ": renaming" + }) continue } if err = DirSync(dirToSync); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "sync") + state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string { + return logMsg(les) + ": dirsyncing" + }) continue } - state.Ctx.LogI("sp-file", lesp, "done") + state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string { + return logMsg(les) + ": done" + }) state.wg.Add(1) go func() { state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash}) @@ -1148,14 +1479,18 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { continue } if _, err = fd.Seek(0, io.SeekStart); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "seek") + state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string { + return logMsg(les) + ": seeking" + }) state.closeFd(filePathPart) continue } _, pktEncRaw, err := state.Ctx.HdrRead(fd) state.closeFd(filePathPart) if err != nil { - state.Ctx.LogE("sp-file", lesp, err, "HdrRead") + state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string { + return logMsg(les) + ": HdrReading" + }) continue } state.Ctx.HdrWrite(pktEncRaw, filePath) @@ -1163,14 +1498,20 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } state.closeFd(filePathPart) if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "rename") + state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string { + return logMsg(les) + ": renaming" + }) continue } if err = DirSync(dirToSync); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "sync") + state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string { + return logMsg(les) + ": dirsyncing" + }) continue } - state.Ctx.LogI("sp-file", lesp, "downloaded") + state.Ctx.LogI("sp-file-downloaded", lesp, func(les LEs) string { + return logMsg(les) + ": downloaded" + }) state.Lock() delete(state.infosTheir, *file.Hash) state.Unlock() @@ -1180,41 +1521,74 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { case SPTypeDone: lesp := append(les, LE{"Type", "done"}) - state.Ctx.LogD("sp-process", lesp, "unmarshaling packet") + state.Ctx.LogD("sp-process-done-unmarshal", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling DONE", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) var done SPDone if _, err = xdr.Unmarshal(r, &done); err != nil { - state.Ctx.LogE("sp-process", lesp, err, "") + state.Ctx.LogE("sp-process-done-unmarshal", lesp, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling DONE", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) return nil, err } - lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(done.Hash[:])}) - lesp = append(lesp, LE{"XX", string(TTx)}) - state.Ctx.LogD("sp-done", lesp, "removing") + pktName := Base32Codec.EncodeToString(done.Hash[:]) + lesp = append(lesp, LE{"Pkt", pktName}, LE{"XX", string(TTx)}) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): DONE: removing %s", + state.Node.Name, NicenessFmt(state.Nice), pktName, + ) + } + state.Ctx.LogD("sp-done", lesp, logMsg) pth := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TTx), - Base32Codec.EncodeToString(done.Hash[:]), + pktName, ) if err = os.Remove(pth); err == nil { - state.Ctx.LogI("sp-done", lesp, "") + state.Ctx.LogI("sp-done", lesp, func(les LEs) string { + return fmt.Sprintf("Packet %s is sent", pktName) + }) if state.Ctx.HdrUsage { os.Remove(pth + HdrSuffix) } } else { - state.Ctx.LogE("sp-done", lesp, err, "") + state.Ctx.LogE("sp-done", lesp, err, logMsg) } case SPTypeFreq: lesp := append(les, LE{"Type", "freq"}) - state.Ctx.LogD("sp-process", lesp, "unmarshaling packet") + state.Ctx.LogD("sp-process-freq", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling FREQ", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) var freq SPFreq if _, err = xdr.Unmarshal(r, &freq); err != nil { - state.Ctx.LogE("sp-process", lesp, err, "") + state.Ctx.LogE("sp-process-freq", lesp, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling FREQ", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) return nil, err } - lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(freq.Hash[:])}) - lesp = append(lesp, LE{"Offset", freq.Offset}) - state.Ctx.LogD("sp-process", lesp, "queueing") + pktName := Base32Codec.EncodeToString(freq.Hash[:]) + lesp = append(lesp, LE{"Pkt", pktName}, LE{"Offset", freq.Offset}) + state.Ctx.LogD("sp-process-freq-queueing", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): FREQ %s: queuing", + state.Node.Name, NicenessFmt(state.Nice), pktName, + ) + }) nice, exists := state.infosOurSeen[*freq.Hash] if exists { if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] { @@ -1231,22 +1605,38 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice} state.Unlock() } else { - state.Ctx.LogD("sp-process", lesp, "skipping") + state.Ctx.LogD("sp-process-freq-skip", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): FREQ %s: skipping", + state.Node.Name, NicenessFmt(state.Nice), pktName, + ) + }) } } else { - state.Ctx.LogD("sp-process", lesp, "unknown") + state.Ctx.LogD("sp-process-freq-unknown", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): FREQ %s: unknown", + state.Node.Name, NicenessFmt(state.Nice), pktName, + ) + }) } default: state.Ctx.LogE( - "sp-process", + "sp-process-type-unknown", append(les, LE{"Type", head.Type}), errors.New("unknown type"), - "", + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): %d", + state.Node.Name, NicenessFmt(state.Nice), head.Type, + ) + }, ) return nil, BadPktType } } + if infosGot { var pkts int var size uint64 @@ -1256,12 +1646,17 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { size += info.Size } state.RUnlock() - state.Ctx.LogI("sp-infos", LEs{ + state.Ctx.LogI("sp-infos-rx", LEs{ {"XX", string(TRx)}, {"Node", state.Node.Id}, {"Pkts", pkts}, {"Size", int64(size)}, - }, "") + }, func(les LEs) string { + return fmt.Sprintf( + "%s has got for us: %d packets, %s", + state.Node.Name, pkts, humanize.IBytes(size), + ) + }) } return payloadsSplit(replies), nil } diff --git a/src/tmp.go b/src/tmp.go index b99ef3d..d3c833e 100644 --- a/src/tmp.go +++ b/src/tmp.go @@ -19,6 +19,7 @@ package nncp import ( "bufio" + "fmt" "hash" "io" "os" @@ -44,7 +45,9 @@ func (ctx *Ctx) NewTmpFile() (*os.File, error) { } fd, err := TempFile(jobsPath, "") if err == nil { - ctx.LogD("tmp", LEs{{"Src", fd.Name()}}, "created") + ctx.LogD("tmp", LEs{{"Src", fd.Name()}}, func(les LEs) string { + return "Temporary file created: %s" + fd.Name() + }) } return fd, err } @@ -113,7 +116,13 @@ func (tmp *TmpFileWHash) Commit(dir string) error { return err } checksum := tmp.Checksum() - tmp.ctx.LogD("tmp", LEs{{"Src", tmp.Fd.Name()}, {"Dst", checksum}}, "commit") + tmp.ctx.LogD( + "tmp-rename", + LEs{{"Src", tmp.Fd.Name()}, {"Dst", checksum}}, + func(les LEs) string { + return fmt.Sprintf("Temporary file: %s -> %s", tmp.Fd.Name(), checksum) + }, + ) if err = os.Rename(tmp.Fd.Name(), filepath.Join(dir, checksum)); err != nil { return err } diff --git a/src/toss.go b/src/toss.go index 576abb1..37acf54 100644 --- a/src/toss.go +++ b/src/toss.go @@ -53,13 +53,14 @@ func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader "Subject: " + mime.BEncoding.Encode("UTF-8", subject), } if len(body) > 0 { - lines = append(lines, []string{ + lines = append( + lines, "MIME-Version: 1.0", "Content-Type: text/plain; charset=utf-8", "Content-Transfer-Encoding: base64", "", base64.StdEncoding.EncodeToString(body), - }...) + ) } return strings.NewReader(strings.Join(lines, "\n")) } @@ -71,7 +72,6 @@ func (ctx *Ctx) Toss( ) bool { dirLock, err := ctx.LockDir(nodeId, "toss") if err != nil { - ctx.LogE("rx", LEs{}, err, "lock") return false } defer ctx.UnlockDir(dirLock) @@ -84,14 +84,29 @@ func (ctx *Ctx) Toss( defer decompressor.Close() for job := range ctx.Jobs(nodeId, TRx) { pktName := filepath.Base(job.Path) - les := LEs{{"Node", job.PktEnc.Sender}, {"Pkt", pktName}} + les := LEs{ + {"Node", job.PktEnc.Sender}, + {"Pkt", pktName}, + {"Nice", int(job.PktEnc.Nice)}, + } if job.PktEnc.Nice > nice { - ctx.LogD("rx", append(les, LE{"Nice", int(job.PktEnc.Nice)}), "too nice") + ctx.LogD("rx-too-nice", les, func(les LEs) string { + return fmt.Sprintf( + "Tossing %s/%s: too nice: %s", + ctx.NodeName(job.PktEnc.Sender), pktName, + NicenessFmt(job.PktEnc.Nice), + ) + }) continue } fd, err := os.Open(job.Path) if err != nil { - ctx.LogE("rx", les, err, "open") + ctx.LogE("rx-open", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing %s/%s: opening %s", + ctx.NodeName(job.PktEnc.Sender), pktName, job.Path, + ) + }) isBad = true continue } @@ -113,7 +128,12 @@ func (ctx *Ctx) Toss( var pktSize int64 var pktSizeBlocks int64 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil { - ctx.LogE("rx", les, err, "unmarshal") + ctx.LogE("rx-unmarshal", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing %s/%s: unmarshal", + ctx.NodeName(job.PktEnc.Sender), pktName, + ) + }) isBad = true goto Closing } @@ -124,7 +144,14 @@ func (ctx *Ctx) Toss( } pktSize -= pktSizeBlocks * poly1305.TagSize les = append(les, LE{"Size", pktSize}) - ctx.LogD("rx", les, "taken") + ctx.LogD("rx", les, func(les LEs) string { + return fmt.Sprintf( + "Tossing %s/%s (%s)", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), + ) + }) + switch pkt.Type { case PktTypeExec, PktTypeExecFat: if noExec { @@ -137,14 +164,20 @@ func (ctx *Ctx) Toss( args = append(args, string(p)) } argsStr := strings.Join(append([]string{handle}, args...), " ") - les = append(les, LEs{ - {"Type", "exec"}, - {"Dst", argsStr}, - }...) + les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr}) sender := ctx.Neigh[*job.PktEnc.Sender] cmdline, exists := sender.Exec[handle] if !exists || len(cmdline) == 0 { - ctx.LogE("rx", les, errors.New("No handle found"), "") + ctx.LogE( + "rx-no-handle", les, errors.New("No handle found"), + func(les LEs) string { + return fmt.Sprintf( + "Tossing exec %s/%s (%s): %s", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), argsStr, + ) + }, + ) isBad = true goto Closing } @@ -154,10 +187,7 @@ func (ctx *Ctx) Toss( } } if !dryRun { - cmd := exec.Command( - cmdline[0], - append(cmdline[1:], args...)..., - ) + cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...) cmd.Env = append( cmd.Env, "NNCP_SELF="+ctx.Self.Id.String(), @@ -171,7 +201,13 @@ func (ctx *Ctx) Toss( } output, err := cmd.Output() if err != nil { - ctx.LogE("rx", les, err, "handle") + ctx.LogE("rx-hande", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing exec %s/%s (%s): %s: handling", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), argsStr, + ) + }) isBad = true goto Closing } @@ -189,12 +225,24 @@ func (ctx *Ctx) Toss( "Exec from %s: %s", sender.Name, argsStr, ), output) if err = cmd.Run(); err != nil { - ctx.LogE("rx", les, err, "notify") + ctx.LogE("rx-notify", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing exec %s/%s (%s): %s: notifying", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), argsStr, + ) + }) } } } } - ctx.LogI("rx", les, "") + ctx.LogI("rx", les, func(les LEs) string { + return fmt.Sprintf( + "Got exec from %s to %s (%s)", + ctx.NodeName(job.PktEnc.Sender), argsStr, + humanize.IBytes(uint64(pktSize)), + ) + }) if !dryRun { if doSeen { if fd, err := os.Create(job.Path + SeenSuffix); err == nil { @@ -202,68 +250,135 @@ func (ctx *Ctx) Toss( } } if err = os.Remove(job.Path); err != nil { - ctx.LogE("rx", les, err, "remove") + ctx.LogE("rx-notify", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing exec %s/%s (%s): %s: notifying", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), argsStr, + ) + }) isBad = true } else if ctx.HdrUsage { os.Remove(job.Path + HdrSuffix) } } + case PktTypeFile: if noFile { goto Closing } dst := string(pkt.Path[:int(pkt.PathLen)]) - les = append(les, LEs{{"Type", "file"}, {"Dst", dst}}...) + les = append(les, LE{"Type", "file"}, LE{"Dst", dst}) if filepath.IsAbs(dst) { - ctx.LogE("rx", les, errors.New("non-relative destination path"), "") + ctx.LogE( + "rx-non-rel", les, errors.New("non-relative destination path"), + func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }, + ) isBad = true goto Closing } incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming if incoming == nil { - ctx.LogE("rx", les, errors.New("incoming is not allowed"), "") + ctx.LogE( + "rx-no-incoming", les, errors.New("incoming is not allowed"), + func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }, + ) isBad = true goto Closing } dir := filepath.Join(*incoming, path.Dir(dst)) if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil { - ctx.LogE("rx", les, err, "mkdir") + ctx.LogE("rx-mkdir", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: mkdir", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) isBad = true goto Closing } if !dryRun { tmp, err := TempFile(dir, "file") if err != nil { - ctx.LogE("rx", les, err, "mktemp") + ctx.LogE("rx-mktemp", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: mktemp", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) isBad = true goto Closing } les = append(les, LE{"Tmp", tmp.Name()}) - ctx.LogD("rx", les, "created") + ctx.LogD("rx-tmp-created", les, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: created: %s", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, tmp.Name(), + ) + }) bufW := bufio.NewWriter(tmp) if _, err = CopyProgressed( bufW, pipeR, "Rx file", append(les, LE{"FullSize", pktSize}), ctx.ShowPrgrs, ); err != nil { - ctx.LogE("rx", les, err, "copy") + ctx.LogE("rx-copy", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: copying", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) isBad = true goto Closing } if err = bufW.Flush(); err != nil { tmp.Close() // #nosec G104 - ctx.LogE("rx", les, err, "copy") + ctx.LogE("rx-flush", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: flushing", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) isBad = true goto Closing } if err = tmp.Sync(); err != nil { tmp.Close() // #nosec G104 - ctx.LogE("rx", les, err, "copy") + ctx.LogE("rx-sync", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: syncing", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) isBad = true goto Closing } if err = tmp.Close(); err != nil { - ctx.LogE("rx", les, err, "copy") + ctx.LogE("rx-close", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: closing", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) isBad = true goto Closing } @@ -275,7 +390,13 @@ func (ctx *Ctx) Toss( if os.IsNotExist(err) { break } - ctx.LogE("rx", les, err, "stat") + ctx.LogE("rx-stat", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: stating: %s", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, dstPath, + ) + }) isBad = true goto Closing } @@ -283,16 +404,34 @@ func (ctx *Ctx) Toss( dstPathCtr++ } if err = os.Rename(tmp.Name(), dstPath); err != nil { - ctx.LogE("rx", les, err, "rename") + ctx.LogE("rx-rename", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: renaming", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) isBad = true } if err = DirSync(*incoming); err != nil { - ctx.LogE("rx", les, err, "sync") + ctx.LogE("rx-dirsync", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: dirsyncing", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) isBad = true } les = les[:len(les)-1] // delete Tmp } - ctx.LogI("rx", les, "") + ctx.LogI("rx", les, func(les LEs) string { + return fmt.Sprintf( + "Got file %s (%s) from %s", + dst, humanize.IBytes(uint64(pktSize)), + ctx.NodeName(job.PktEnc.Sender), + ) + }) if !dryRun { if doSeen { if fd, err := os.Create(job.Path + SeenSuffix); err == nil { @@ -300,7 +439,13 @@ func (ctx *Ctx) Toss( } } if err = os.Remove(job.Path); err != nil { - ctx.LogE("rx", les, err, "remove") + ctx.LogE("rx-remove", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: removing", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) isBad = true } else if ctx.HdrUsage { os.Remove(job.Path + HdrSuffix) @@ -317,24 +462,46 @@ func (ctx *Ctx) Toss( humanize.IBytes(uint64(pktSize)), ), nil) if err = cmd.Run(); err != nil { - ctx.LogE("rx", les, err, "notify") + ctx.LogE("rx-notify", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: notifying", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) } } } + case PktTypeFreq: if noFreq { goto Closing } src := string(pkt.Path[:int(pkt.PathLen)]) + les := append(les, LE{"Type", "freq"}, LE{"Src", src}) if filepath.IsAbs(src) { - ctx.LogE("rx", les, errors.New("non-relative source path"), "") + ctx.LogE( + "rx-non-rel", les, errors.New("non-relative source path"), + func(les LEs) string { + return fmt.Sprintf( + "Tossing freq %s/%s (%s): %s: notifying", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), src, + ) + }, + ) isBad = true goto Closing } - les := append(les, LEs{{"Type", "freq"}, {"Src", src}}...) dstRaw, err := ioutil.ReadAll(pipeR) if err != nil { - ctx.LogE("rx", les, err, "read") + ctx.LogE("rx-read", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing freq %s/%s (%s): %s: reading", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), src, + ) + }) isBad = true goto Closing } @@ -343,7 +510,16 @@ func (ctx *Ctx) Toss( sender := ctx.Neigh[*job.PktEnc.Sender] freqPath := sender.FreqPath if freqPath == nil { - ctx.LogE("rx", les, errors.New("freqing is not allowed"), "") + ctx.LogE( + "rx-no-freq", les, errors.New("freqing is not allowed"), + func(les LEs) string { + return fmt.Sprintf( + "Tossing freq %s/%s (%s): %s -> %s", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), src, dst, + ) + }, + ) isBad = true goto Closing } @@ -358,12 +534,23 @@ func (ctx *Ctx) Toss( sender.FreqMaxSize, ) if err != nil { - ctx.LogE("rx", les, err, "tx file") + ctx.LogE("rx-tx", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing freq %s/%s (%s): %s -> %s: txing", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), src, dst, + ) + }) isBad = true goto Closing } } - ctx.LogI("rx", les, "") + ctx.LogI("rx", les, func(les LEs) string { + return fmt.Sprintf( + "Got file request %s to %s", + src, ctx.NodeName(job.PktEnc.Sender), + ) + }) if !dryRun { if doSeen { if fd, err := os.Create(job.Path + SeenSuffix); err == nil { @@ -371,7 +558,13 @@ func (ctx *Ctx) Toss( } } if err = os.Remove(job.Path); err != nil { - ctx.LogE("rx", les, err, "remove") + ctx.LogE("rx-remove", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing freq %s/%s (%s): %s -> %s: removing", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), src, dst, + ) + }) isBad = true } else if ctx.HdrUsage { os.Remove(job.Path + HdrSuffix) @@ -385,10 +578,17 @@ func (ctx *Ctx) Toss( "Freq from %s: %s", sender.Name, src, ), nil) if err = cmd.Run(); err != nil { - ctx.LogE("rx", les, err, "notify") + ctx.LogE("rx-notify", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing freq %s/%s (%s): %s -> %s: notifying", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), src, dst, + ) + }) } } } + case PktTypeTrns: if noTrns { goto Closing @@ -397,21 +597,39 @@ func (ctx *Ctx) Toss( copy(dst[:], pkt.Path[:int(pkt.PathLen)]) nodeId := NodeId(*dst) node, known := ctx.Neigh[nodeId] - les := append(les, LEs{{"Type", "trns"}, {"Dst", nodeId}}...) + les := append(les, LE{"Type", "trns"}, LE{"Dst", nodeId}) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "Tossing trns %s/%s (%s): %s", + ctx.NodeName(job.PktEnc.Sender), + pktName, + humanize.IBytes(uint64(pktSize)), + nodeId.String(), + ) + } if !known { - ctx.LogE("rx", les, errors.New("unknown node"), "") + ctx.LogE("rx-unknown", les, errors.New("unknown node"), logMsg) isBad = true goto Closing } - ctx.LogD("rx", les, "taken") + ctx.LogD("rx-tx", les, logMsg) if !dryRun { if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil { - ctx.LogE("rx", les, err, "tx trns") + ctx.LogE("rx", les, err, func(les LEs) string { + return logMsg(les) + ": txing" + }) isBad = true goto Closing } } - ctx.LogI("rx", les, "") + ctx.LogI("rx", les, func(les LEs) string { + return fmt.Sprintf( + "Got transitional packet from %s to %s (%s)", + ctx.NodeName(job.PktEnc.Sender), + ctx.NodeName(&nodeId), + humanize.IBytes(uint64(pktSize)), + ) + }) if !dryRun { if doSeen { if fd, err := os.Create(job.Path + SeenSuffix); err == nil { @@ -419,14 +637,33 @@ func (ctx *Ctx) Toss( } } if err = os.Remove(job.Path); err != nil { - ctx.LogE("rx", les, err, "remove") + ctx.LogE("rx", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing trns %s/%s (%s): %s: removing", + ctx.NodeName(job.PktEnc.Sender), + pktName, + humanize.IBytes(uint64(pktSize)), + ctx.NodeName(&nodeId), + ) + }) isBad = true } else if ctx.HdrUsage { os.Remove(job.Path + HdrSuffix) } } + default: - ctx.LogE("rx", les, errors.New("unknown type"), "") + ctx.LogE( + "rx-type-unknown", les, errors.New("unknown type"), + func(les LEs) string { + return fmt.Sprintf( + "Tossing %s/%s (%s)", + ctx.NodeName(job.PktEnc.Sender), + pktName, + humanize.IBytes(uint64(pktSize)), + ) + }, + ) isBad = true } Closing: diff --git a/src/tx.go b/src/tx.go index 3e57f9b..73fbe4e 100644 --- a/src/tx.go +++ b/src/tx.go @@ -23,6 +23,7 @@ import ( "bytes" "crypto/rand" "errors" + "fmt" "hash" "io" "io/ioutil" @@ -33,6 +34,7 @@ import ( "time" xdr "github.com/davecgh/go-xdr/xdr2" + "github.com/dustin/go-humanize" "github.com/klauspost/compress/zstd" "golang.org/x/crypto/blake2b" "golang.org/x/crypto/chacha20poly1305" @@ -87,7 +89,14 @@ func (ctx *Ctx) Tx( {"Node", hops[0].Id}, {"Nice", int(nice)}, {"Size", size}, - }, "wrote") + }, func(les LEs) string { + return fmt.Sprintf( + "Tx packet to %s (%s) nice: %s", + ctx.NodeName(hops[0].Id), + humanize.IBytes(uint64(size)), + NicenessFmt(nice), + ) + }) pktEncRaw, err = PktEncWrite( ctx.Self, hops[0], pkt, nice, size, padSize, src, dst, ) @@ -109,7 +118,14 @@ func (ctx *Ctx) Tx( {"Node", node.Id}, {"Nice", int(nice)}, {"Size", size}, - }, "trns wrote") + }, func(les LEs) string { + return fmt.Sprintf( + "Tx trns packet to %s (%s) nice: %s", + ctx.NodeName(node.Id), + humanize.IBytes(uint64(size)), + NicenessFmt(nice), + ) + }) _, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst) errs <- err dst.Close() // #nosec G104 @@ -371,10 +387,19 @@ func (ctx *Ctx) TxFile( {"Dst", dstPath}, {"Size", fileSize}, } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "File %s (%s) sent to %s:%s", + srcPath, + humanize.IBytes(uint64(fileSize)), + ctx.NodeName(node.Id), + dstPath, + ) + } if err == nil { - ctx.LogI("tx", les, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogE("tx", les, err, "sent") + ctx.LogE("tx", les, err, logMsg) } return err } @@ -427,10 +452,19 @@ func (ctx *Ctx) TxFile( {"Dst", path}, {"Size", sizeToSend}, } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "File %s (%s) sent to %s:%s", + srcPath, + humanize.IBytes(uint64(sizeToSend)), + ctx.NodeName(node.Id), + path, + ) + } if err == nil { - ctx.LogI("tx", les, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogE("tx", les, err, "sent") + ctx.LogE("tx", les, err, logMsg) return err } hsh.Sum(metaPkt.Checksums[chunkNum][:0]) @@ -460,10 +494,19 @@ func (ctx *Ctx) TxFile( {"Dst", path}, {"Size", metaPktSize}, } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "File %s (%s) sent to %s:%s", + srcPath, + humanize.IBytes(uint64(metaPktSize)), + ctx.NodeName(node.Id), + path, + ) + } if err == nil { - ctx.LogI("tx", les, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogE("tx", les, err, "sent") + ctx.LogE("tx", les, err, logMsg) } return err } @@ -496,10 +539,17 @@ func (ctx *Ctx) TxFreq( {"Src", srcPath}, {"Dst", dstPath}, } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "File request from %s:%s to %s sent", + ctx.NodeName(node.Id), srcPath, + dstPath, + ) + } if err == nil { - ctx.LogI("tx", les, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogE("tx", les, err, "sent") + ctx.LogE("tx", les, err, logMsg) } return err } @@ -599,18 +649,25 @@ func (ctx *Ctx) TxExec( _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle) } + dst := strings.Join(append([]string{handle}, args...), " ") les := LEs{ {"Type", "exec"}, {"Node", node.Id}, {"Nice", int(nice)}, {"ReplyNice", int(replyNice)}, - {"Dst", strings.Join(append([]string{handle}, args...), " ")}, + {"Dst", dst}, {"Size", size}, } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "Exec sent to %s@%s (%s)", + ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)), + ) + } if err == nil { - ctx.LogI("tx", les, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogE("tx", les, err, "sent") + ctx.LogE("tx", les, err, logMsg) } return err } @@ -622,10 +679,18 @@ func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {"Nice", int(nice)}, {"Size", size}, } - ctx.LogD("tx", les, "taken") + logMsg := func(les LEs) string { + return fmt.Sprintf( + "Transitional packet to %s (%s) (nice %s)", + ctx.NodeName(node.Id), + humanize.IBytes(uint64(size)), + NicenessFmt(nice), + ) + } + ctx.LogD("tx", les, logMsg) if !ctx.IsEnoughSpace(size) { err := errors.New("is not enough space") - ctx.LogE("tx", les, err, err.Error()) + ctx.LogE("tx", les, err, logMsg) return err } tmp, err := ctx.NewTmpFileWHash() @@ -642,9 +707,9 @@ func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error nodePath := filepath.Join(ctx.Spool, node.Id.String()) err = tmp.Commit(filepath.Join(nodePath, string(TTx))) if err == nil { - ctx.LogI("tx", les, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogI("tx", append(les, LE{"Err", err}), "sent") + ctx.LogI("tx", append(les, LE{"Err", err}), logMsg) } os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) // #nosec G104 return err -- 2.44.0