@verb{|uucp|} user:
@example
-# mkdir -p /var/service/nncp-toss/log
-# chmod 755 /var/service/nncp-toss/log /var/service/nncp-toss
-# cd /var/service/nncp-toss
+# mkdir -p /var/service/.nncp-toss/log
+# cd /var/service/.nncp-toss
-# cat > run_ <<EOF
+# cat > run <<EOF
#!/bin/sh -e
exec 2>&1
exec setuidgid uucp /usr/local/bin/nncp-toss -cycle 10
EOF
-# chmod 755 run_
-# cat > log/run_ <<EOF
+# cat > log/run <<EOF
#!/bin/sh -e
exec setuidgid uucp multilog t ./main
EOF
-# chmod 755 log/run_
-# mv log/run_ log/run ; mv run_ run
+# chmod -R 755 /var/service/.nncp-toss
+# mv /var/service/.nncp-toss /var/service/nncp-toss
@end example
@item
uucp stream tcp6 nowait nncpuser /usr/local/bin/nncp-daemon nncp-daemon -quiet -inetd
@end example
+@item
+ Or it can be also run as a @command{daemontools} daemon under
+ @url{http://cr.yp.to/ucspi-tcp.html, UCSPI-TCP}:
+
+@example
+# mkdir -p /var/service/.nncp-daemon/log
+# cd /var/service/.nncp-daemon
+
+# cat > run <<EOF
+#!/bin/sh -e
+exec envuidgid nncpuser tcpserver -DRHU -l 0 0 uucp \
+ /usr/local/bin/nncp-daemon -quiet -inetd
+EOF
+
+# cat > log/run <<EOF
+#!/bin/sh -e
+exec setuidgid uucp multilog t ./main
+EOF
+
+# chmod -R 755 /var/service/.nncp-daemon
+# mv /var/service/.nncp-daemon /var/service/nncp-daemon
+@end example
+
@end itemize
@multitable {XXXXX} {XXXX-XX-XX} {XXXX KiB} {link sign} {xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx}
@headitem Version @tab Date @tab Size @tab Tarball @tab SHA256 checksum
+@item @ref{Release 6.1.0, 6.1.0} @tab 2021-02-24 @tab 1040 KiB
+@tab @url{download/nncp-6.1.0.tar.xz, link} @url{download/nncp-6.1.0.tar.xz.sig, sign}
+@tab @code{083A533F 7D021206 9AE07F9F D6CD22E3 C5BE09E8 30F2C9C4 97D97CF6 14E5413F}
+
@item @ref{Release 6.0.0, 6.0.0} @tab 2021-01-23 @tab 1028 KiB
@tab @url{download/nncp-6.0.0.tar.xz, link} @url{download/nncp-6.0.0.tar.xz.sig, sign}
@tab @code{42FE8AA5 4520B3A1 ABB50D66 1BBBA6A1 41CE4E74 9B4816B0 D4C6845D 67465916}
@node Новости
@section Новости
+@node Релиз 6.2.0
+@subsection Релиз 6.2.0
+@itemize
+
+@item
+Возвращена работоспособность @option{-autotoss*} опций @command{nncp-caller}.
+
+@item
+Очередной рефакторинг и упрощение системы журналирования.
+Не должно быть видимых изменений для конечного пользователя.
+
+@end itemize
+
@node Релиз 6.1.0
@subsection Релиз 6.1.0
@itemize
See also this page @ref{Новости, on russian}.
+@node Release 6.2.0
+@section Release 6.2.0
+@itemize
+
+@item
+Returned @command{nncp-caller}'s @option{-autotoss*} options workability.
+
+@item
+Yet another logging refactoring and simplification.
+Should be no visible differences to the end user.
+
+@end itemize
+
@node Release 6.1.0
@section Release 6.1.0
@itemize
rm -fr nncp.html
-MAKEINFO_OPTS="$MAKEINFO_OPTS --html"
+MAKEINFO_OPTS="$MAKEINFO_OPTS --html --css-include style.css"
MAKEINFO_OPTS="$MAKEINFO_OPTS --set-customization-variable SHOW_TITLE=0"
MAKEINFO_OPTS="$MAKEINFO_OPTS --set-customization-variable DATE_IN_HEADER=1"
MAKEINFO_OPTS="$MAKEINFO_OPTS --set-customization-variable TOP_NODE_UP_URL=index.html"
$MAKEINFO_OPTS \
--set-customization-variable CLOSE_QUOTE_SYMBOL=\" \
--set-customization-variable OPEN_QUOTE_SYMBOL=\" \
- --set-customization-variable CSS_LINES="`cat style.css`" \
--output $3 index.texi
-<style type="text/css"><!--
body {
margin: auto;
width: 80em;
h1, h2, h3, h4, strong { color: #900090 }
pre { background-color: #CCCCCC }
table, th, td { border: 1px solid black ; border-collapse: collapse }
---></style>
# $FreeBSD: $
PORTNAME= nncp
-DISTVERSION= 5.7.0
+DISTVERSION= 6.1.0
CATEGORIES= net
MASTER_SITES= http://www.nncpgo.org/download/
LICENSE= GPLv3
LICENSE_FILE= ${WRKSRC}/COPYING
-USES= go:no_targets tar:xz
+USES= go:modules,no_targets tar:xz
USE_RC_SUBR= nncp-caller nncp-daemon nncp-toss
package nncp
import (
+ "fmt"
"net"
"time"
+ "github.com/dustin/go-humanize"
"github.com/gorhill/cronexpr"
)
) (isGood bool) {
for _, addr := range addrs {
les := LEs{{"Node", node.Id}, {"Addr", addr}}
- ctx.LogD("call", les, "dialing")
+ ctx.LogD("calling", les, func(les LEs) string {
+ return fmt.Sprintf("Calling %s (%s)", node.Name, addr)
+ })
var conn ConnDeadlined
var err error
if addr[0] == '|' {
conn, err = net.Dial("tcp", addr)
}
if err != nil {
- ctx.LogD("call", append(les, LE{"Err", err}), "dialing")
+ ctx.LogD("calling", append(les, LE{"Err", err}), func(les LEs) string {
+ return fmt.Sprintf("Calling %s (%s)", node.Name, addr)
+ })
continue
}
- ctx.LogD("call", les, "connected")
+ ctx.LogD("call-connected", les, func(les LEs) string {
+ return fmt.Sprintf("Connected %s (%s)", node.Name, addr)
+ })
state := SPState{
Ctx: ctx,
Node: node,
onlyPkts: onlyPkts,
}
if err = state.StartI(conn); err == nil {
- ctx.LogI("call-start", les, "connected")
+ ctx.LogI("call-started", les, func(les LEs) string {
+ return fmt.Sprintf("Connection to %s (%s)", node.Name, addr)
+ })
state.Wait()
- ctx.LogI("call-finish", LEs{
- {"Node", state.Node.Id},
- {"Duration", int64(state.Duration.Seconds())},
- {"RxBytes", state.RxBytes},
- {"TxBytes", state.TxBytes},
- {"RxSpeed", state.RxSpeed},
- {"TxSpeed", state.TxSpeed},
- }, "")
+ ctx.LogI("call-finished", append(
+ les,
+ LE{"Duration", int64(state.Duration.Seconds())},
+ LE{"RxBytes", state.RxBytes},
+ LE{"RxSpeed", state.RxSpeed},
+ LE{"TxBytes", state.TxBytes},
+ LE{"TxSpeed", state.TxSpeed},
+ ), func(les LEs) string {
+ return fmt.Sprintf(
+ "Finished call with %s (%d:%d:%d): %s received (%s/sec), %s transferred (%s/sec)",
+ node.Name,
+ int(state.Duration.Hours()),
+ int(state.Duration.Minutes()),
+ int(state.Duration.Seconds()),
+ humanize.IBytes(uint64(state.RxBytes)),
+ humanize.IBytes(uint64(state.RxSpeed)),
+ humanize.IBytes(uint64(state.TxBytes)),
+ humanize.IBytes(uint64(state.TxSpeed)),
+ )
+ })
isGood = true
conn.Close() // #nosec G104
break
} else {
- ctx.LogE("call-start", les, err, "")
+ ctx.LogE("call-started", les, err, func(les LEs) string {
+ return fmt.Sprintf("Connection to %s (%s)", node.Name, addr)
+ })
conn.Close() // #nosec G104
}
}
"bufio"
"bytes"
"errors"
+ "fmt"
"io"
"log"
"os"
func (ctx *Ctx) checkXxIsBad(nodeId *NodeId, xx TRxTx) bool {
isBad := false
for job := range ctx.Jobs(nodeId, xx) {
+ pktName := Base32Codec.EncodeToString(job.HshValue[:])
les := LEs{
{"XX", string(xx)},
{"Node", nodeId},
- {"Pkt", Base32Codec.EncodeToString(job.HshValue[:])},
+ {"Pkt", pktName},
{"FullSize", job.Size},
}
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf("Checking: %s/%s/%s", nodeId, string(xx), pktName)
+ }
fd, err := os.Open(job.Path)
if err != nil {
- ctx.LogE("check", les, err, "")
+ ctx.LogE("checking", les, err, logMsg)
return true
}
gut, err := Check(fd, job.HshValue[:], les, ctx.ShowPrgrs)
fd.Close() // #nosec G104
if err != nil {
- ctx.LogE("check", les, err, "")
+ ctx.LogE("checking", les, err, logMsg)
return true
}
if !gut {
isBad = true
- ctx.LogE("check", les, errors.New("bad"), "")
+ ctx.LogE("checking", les, errors.New("bad"), logMsg)
}
}
return isBad
"strings"
xdr "github.com/davecgh/go-xdr/xdr2"
- "go.cypherpunks.ru/nncp/v5"
+ "github.com/dustin/go-humanize"
+ "go.cypherpunks.ru/nncp/v6"
"golang.org/x/crypto/blake2b"
)
bufStdout := bufio.NewWriter(os.Stdout)
tarWr := tar.NewWriter(bufStdout)
for nodeId := range nodeIds {
- les := nncp.LEs{
- {K: "XX", V: string(nncp.TTx)},
- {K: "Node", V: nodeId.String()},
- {K: "Pkt", V: "dummy"},
- }
for job := range ctx.Jobs(&nodeId, nncp.TTx) {
pktName = filepath.Base(job.Path)
- les[len(les)-1].V = pktName
+ les := nncp.LEs{
+ {K: "XX", V: string(nncp.TTx)},
+ {K: "Node", V: nodeId.String()},
+ {K: "Pkt", V: pktName},
+ }
if job.PktEnc.Nice > nice {
- ctx.LogD("nncp-bundle", les, "too nice")
+ ctx.LogD("bundle-tx-too-nice", les, func(les nncp.LEs) string {
+ return fmt.Sprintf(
+ "Bundle transfer %s/tx/%s: too nice %s",
+ ctx.NodeName(&nodeId),
+ pktName,
+ nncp.NicenessFmt(job.PktEnc.Nice),
+ )
+ })
continue
}
fd, err := os.Open(job.Path)
os.Remove(job.Path + nncp.HdrSuffix)
}
}
- ctx.LogI("nncp-bundle", append(les, nncp.LE{K: "Size", V: job.Size}), "")
+ ctx.LogI(
+ "bundle-tx",
+ append(les, nncp.LE{K: "Size", V: job.Size}),
+ func(les nncp.LEs) string {
+ return fmt.Sprintf(
+ "Bundle transfer, sent to node %s %s (%s)",
+ ctx.NodeName(&nodeId),
+ pktName,
+ humanize.IBytes(uint64(job.Size)),
+ )
+ },
+ )
}
}
if err = tarWr.Close(); err != nil {
if err != nil {
if err != io.EOF {
ctx.LogD(
- "nncp-bundle",
+ "bundle-rx-read-tar",
nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Err", V: err}},
- "error reading tar",
+ func(les nncp.LEs) string {
+ return "Bundle transfer rx: reading tar"
+ },
)
}
continue
}
if entry.Typeflag != tar.TypeDir {
ctx.LogD(
- "nncp-bundle",
- nncp.LEs{{K: "XX", V: string(nncp.TRx)}},
- "Expected NNCP/",
+ "bundle-rx-read-tar",
+ nncp.LEs{
+ {K: "XX", V: string(nncp.TRx)},
+ {K: "Err", V: errors.New("expected NNCP/")},
+ },
+ func(les nncp.LEs) string {
+ return "Bundle transfer rx: reading tar"
+ },
)
continue
}
if err != nil {
if err != io.EOF {
ctx.LogD(
- "nncp-bundle",
+ "bundle-rx-read-tar",
nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Err", V: err}},
- "error reading tar",
+ func(les nncp.LEs) string {
+ return "Bundle transfer rx: reading tar"
+ },
)
}
continue
}
les := nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Pkt", V: entry.Name}}
+ logMsg := func(les nncp.LEs) string {
+ return "Bundle transfer rx/" + entry.Name
+ }
if entry.Size < nncp.PktEncOverhead {
- ctx.LogD("nncp-bundle", les, "Too small packet")
+ ctx.LogD("bundle-rx-too-small", les, func(les nncp.LEs) string {
+ return logMsg(les) + ": too small packet"
+ })
continue
}
if !ctx.IsEnoughSpace(entry.Size) {
- ctx.LogE("nncp-bundle", les, errors.New("not enough spool space"), "")
+ ctx.LogE("bundle-rx", les, errors.New("not enough spool space"), logMsg)
continue
}
pktName := filepath.Base(entry.Name)
if _, err = nncp.Base32Codec.DecodeString(pktName); err != nil {
- ctx.LogD("nncp-bundle", append(les, nncp.LE{K: "Err", V: "bad packet name"}), "")
+ ctx.LogD(
+ "bundle-rx",
+ append(les, nncp.LE{K: "Err", V: "bad packet name"}),
+ logMsg,
+ )
continue
}
if _, err = io.ReadFull(tarR, pktEncBuf); err != nil {
- ctx.LogD("nncp-bundle", append(les, nncp.LE{K: "Err", V: err}), "read")
+ ctx.LogD(
+ "bundle-rx",
+ append(les, nncp.LE{K: "Err", V: err}),
+ logMsg,
+ )
continue
}
if _, err = xdr.Unmarshal(bytes.NewReader(pktEncBuf), &pktEnc); err != nil {
- ctx.LogD("nncp-bundle", les, "Bad packet structure")
+ ctx.LogD(
+ "bundle-rx",
+ append(les, nncp.LE{K: "Err", V: "Bad packet structure"}),
+ logMsg,
+ )
continue
}
if pktEnc.Magic != nncp.MagicNNCPEv4 {
- ctx.LogD("nncp-bundle", les, "Bad packet magic number")
+ ctx.LogD(
+ "bundle-rx",
+ append(les, nncp.LE{K: "Err", V: "Bad packet magic number"}),
+ logMsg,
+ )
continue
}
if pktEnc.Nice > nice {
- ctx.LogD("nncp-bundle", les, "too nice")
+ ctx.LogD("bundle-rx-too-nice", les, func(les nncp.LEs) string {
+ return logMsg(les) + ": too nice"
+ })
continue
}
if *pktEnc.Sender == *ctx.SelfId && *doDelete {
if len(nodeIds) > 0 {
if _, exists := nodeIds[*pktEnc.Recipient]; !exists {
- ctx.LogD("nncp-bundle", les, "Recipient is not requested")
+ ctx.LogD("bundle-tx-skip", les, func(les nncp.LEs) string {
+ return logMsg(les) + ": recipient is not requested"
+ })
continue
}
}
{K: "Node", V: nodeId32},
{K: "Pkt", V: pktName},
}
+ logMsg = func(les nncp.LEs) string {
+ return fmt.Sprintf("Bundle transfer %s/tx/%s", nodeId32, pktName)
+ }
dstPath := filepath.Join(ctx.Spool, nodeId32, string(nncp.TTx), pktName)
if _, err = os.Stat(dstPath); err != nil {
- ctx.LogD("nncp-bundle", les, "Packet is already missing")
+ ctx.LogD("bundle-tx-missing", les, func(les nncp.LEs) string {
+ return logMsg(les) + ": packet is already missing"
+ })
continue
}
hsh, err := blake2b.New256(nil)
log.Fatalln("Error during copying:", err)
}
if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) == pktName {
- ctx.LogI("nncp-bundle", les, "removed")
+ ctx.LogI("bundle-tx-removed", les, func(les nncp.LEs) string {
+ return logMsg(les) + ": removed"
+ })
if !*dryRun {
os.Remove(dstPath)
if ctx.HdrUsage {
}
}
} else {
- ctx.LogE("nncp-bundle", les, errors.New("bad checksum"), "")
+ ctx.LogE("bundle-tx", les, errors.New("bad checksum"), logMsg)
}
continue
}
if *pktEnc.Recipient != *ctx.SelfId {
- ctx.LogD("nncp-bundle", les, "Unknown recipient")
+ ctx.LogD("nncp-bundle", les, func(les nncp.LEs) string {
+ return logMsg(les) + ": unknown recipient"
+ })
continue
}
if len(nodeIds) > 0 {
if _, exists := nodeIds[*pktEnc.Sender]; !exists {
- ctx.LogD("nncp-bundle", les, "Sender is not requested")
+ ctx.LogD("bundle-rx-skip", les, func(les nncp.LEs) string {
+ return logMsg(les) + ": sender is not requested"
+ })
continue
}
}
{K: "Pkt", V: pktName},
{K: "FullSize", V: entry.Size},
}
+ logMsg = func(les nncp.LEs) string {
+ return fmt.Sprintf("Bundle transfer %s/rx/%s", sender, pktName)
+ }
dstDirPath := filepath.Join(ctx.Spool, sender, string(nncp.TRx))
dstPath := filepath.Join(dstDirPath, pktName)
if _, err = os.Stat(dstPath); err == nil || !os.IsNotExist(err) {
- ctx.LogD("nncp-bundle", les, "Packet already exists")
+ ctx.LogD("bundle-rx-exists", les, func(les nncp.LEs) string {
+ return logMsg(les) + ": packet already exists"
+ })
continue
}
if _, err = os.Stat(dstPath + nncp.SeenSuffix); err == nil || !os.IsNotExist(err) {
- ctx.LogD("nncp-bundle", les, "Packet already exists")
+ ctx.LogD("bundle-rx-seen", les, func(les nncp.LEs) string {
+ return logMsg(les) + ": packet already seen"
+ })
continue
}
if *doCheck {
log.Fatalln("Error during copying:", err)
}
if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) != pktName {
- ctx.LogE("nncp-bundle", les, errors.New("bad checksum"), "")
+ ctx.LogE("bundle-rx", les, errors.New("bad checksum"), logMsg)
continue
}
} else {
log.Fatalln("Error during commiting:", err)
}
} else {
- ctx.LogE("nncp-bundle", les, errors.New("bad checksum"), "")
+ ctx.LogE("bundle-rx", les, errors.New("bad checksum"), logMsg)
tmp.Cancel()
continue
}
break
}
}
- ctx.LogI("nncp-bundle", les, "")
+ ctx.LogI("bundle-rx", les, func(les nncp.LEs) string {
+ return fmt.Sprintf(
+ "Bundle transfer, received from %s %s (%s)",
+ sender, pktName, humanize.IBytes(uint64(entry.Size)),
+ )
+ })
}
}
}
"strings"
"time"
- "go.cypherpunks.ru/nncp/v5"
+ "go.cypherpunks.ru/nncp/v6"
)
func usage() {
"sync"
"time"
- "go.cypherpunks.ru/nncp/v5"
+ "go.cypherpunks.ru/nncp/v6"
)
func usage() {
debug = flag.Bool("debug", false, "Print debug messages")
version = flag.Bool("version", false, "Print version information")
warranty = flag.Bool("warranty", false, "Print warranty information")
+
+ autoToss = flag.Bool("autotoss", false, "Toss after call is finished")
+ autoTossDoSeen = flag.Bool("autotoss-seen", false, "Create .seen files during tossing")
+ autoTossNoFile = flag.Bool("autotoss-nofile", false, "Do not process \"file\" packets during tossing")
+ autoTossNoFreq = flag.Bool("autotoss-nofreq", false, "Do not process \"freq\" packets during tossing")
+ autoTossNoExec = flag.Bool("autotoss-noexec", false, "Do not process \"exec\" packets during tossing")
+ autoTossNoTrns = flag.Bool("autotoss-notrns", false, "Do not process \"trns\" packets during tossing")
)
flag.Usage = usage
flag.Parse()
log.Fatalln("Invalid NODE specified:", err)
}
if len(node.Calls) == 0 {
- ctx.LogD("caller", nncp.LEs{{K: "Node", V: node.Id}}, "has no calls, skipping")
+ ctx.LogD(
+ "caller-no-calls",
+ nncp.LEs{{K: "Node", V: node.Id}},
+ func(les nncp.LEs) string {
+ return fmt.Sprintf("%s node has no calls, skipping", node.Name)
+ },
+ )
continue
}
nodes = append(nodes, node)
} else {
for _, node := range ctx.Neigh {
if len(node.Calls) == 0 {
- ctx.LogD("caller", nncp.LEs{{K: "Node", V: node.Id}}, "has no calls, skipping")
+ ctx.LogD(
+ "caller-no-calls",
+ nncp.LEs{{K: "Node", V: node.Id}},
+ func(les nncp.LEs) string {
+ return fmt.Sprintf("%s node has no calls, skipping", node.Name)
+ },
+ )
continue
}
nodes = append(nodes, node)
addrs = append(addrs, *call.Addr)
}
les := nncp.LEs{{K: "Node", V: node.Id}, {K: "CallIndex", V: i}}
+ logMsg := func(les nncp.LEs) string {
+ return fmt.Sprintf("%s node, call %d", node.Name, i)
+ }
for {
n := time.Now()
t := call.Cron.Next(n)
- ctx.LogD("caller", les, t.String())
+ ctx.LogD("caller-time", les, func(les nncp.LEs) string {
+ return logMsg(les) + ": " + t.String()
+ })
if t.IsZero() {
- ctx.LogE("caller", les, errors.New("got zero time"), "")
+ ctx.LogE("caller", les, errors.New("got zero time"), logMsg)
return
}
time.Sleep(t.Sub(n))
node.Lock()
if node.Busy {
node.Unlock()
- ctx.LogD("caller", les, "busy")
+ ctx.LogD("caller-busy", les, func(les nncp.LEs) string {
+ return logMsg(les) + ": busy"
+ })
continue
} else {
node.Busy = true
node.Unlock()
if call.WhenTxExists && call.Xx != "TRx" {
- ctx.LogD("caller", les, "checking tx existence")
+ ctx.LogD("caller", les, func(les nncp.LEs) string {
+ return logMsg(les) + ": checking tx existence"
+ })
txExists := false
for job := range ctx.Jobs(node.Id, nncp.TTx) {
if job.PktEnc.Nice > call.Nice {
txExists = true
}
if !txExists {
- ctx.LogD("caller", les, "no tx")
+ ctx.LogD("caller-no-tx", les, func(les nncp.LEs) string {
+ return logMsg(les) + ": no tx"
+ })
node.Lock()
node.Busy = false
node.Unlock()
var autoTossFinish chan struct{}
var autoTossBadCode chan bool
- if call.AutoToss {
+ if call.AutoToss || *autoToss {
autoTossFinish, autoTossBadCode = ctx.AutoToss(
node.Id,
call.Nice,
- call.AutoTossDoSeen,
- call.AutoTossNoFile,
- call.AutoTossNoFreq,
- call.AutoTossNoExec,
- call.AutoTossNoTrns,
+ call.AutoTossDoSeen || *autoTossDoSeen,
+ call.AutoTossNoFile || *autoTossNoFile,
+ call.AutoTossNoFreq || *autoTossNoFreq,
+ call.AutoTossNoExec || *autoTossNoExec,
+ call.AutoTossNoTrns || *autoTossNoTrns,
)
}
nil,
)
- if call.AutoToss {
+ if call.AutoToss || *autoToss {
close(autoTossFinish)
<-autoTossBadCode
}
"os"
xdr "github.com/davecgh/go-xdr/xdr2"
- "go.cypherpunks.ru/nncp/v5"
+ "go.cypherpunks.ru/nncp/v6"
"golang.org/x/crypto/blake2b"
"golang.org/x/term"
)
"os"
"github.com/hjson/hjson-go"
- "go.cypherpunks.ru/nncp/v5"
+ "go.cypherpunks.ru/nncp/v6"
)
func usage() {
"fmt"
"os"
- "go.cypherpunks.ru/nncp/v5"
+ "go.cypherpunks.ru/nncp/v6"
)
func usage() {
"os"
"path/filepath"
- "go.cypherpunks.ru/nncp/v5"
+ "go.cypherpunks.ru/nncp/v6"
)
func usage() {
"time"
"github.com/gorhill/cronexpr"
- "go.cypherpunks.ru/nncp/v5"
+ "go.cypherpunks.ru/nncp/v6"
)
func usage() {
"os"
"time"
- "go.cypherpunks.ru/nncp/v5"
+ "github.com/dustin/go-humanize"
+ "go.cypherpunks.ru/nncp/v6"
"golang.org/x/net/netutil"
)
NoCK: noCK,
}
if err := state.StartR(conn); err == nil {
- ctx.LogI("call-start", nncp.LEs{{K: "Node", V: state.Node.Id}}, "connected")
+ ctx.LogI(
+ "call-started",
+ nncp.LEs{{K: "Node", V: state.Node.Id}},
+ func(les nncp.LEs) string { return "Connection with " + state.Node.Name },
+ )
nodeIdC <- state.Node.Id
state.Wait()
- ctx.LogI("call-finish", nncp.LEs{
+ ctx.LogI("call-finished", nncp.LEs{
{K: "Node", V: state.Node.Id},
{K: "Duration", V: int64(state.Duration.Seconds())},
{K: "RxBytes", V: state.RxBytes},
{K: "TxBytes", V: state.TxBytes},
{K: "RxSpeed", V: state.RxSpeed},
{K: "TxSpeed", V: state.TxSpeed},
- }, "")
+ }, func(les nncp.LEs) string {
+ return fmt.Sprintf(
+ "Finished call with %s (%d:%d:%d): %s received (%s/sec), %s transferred (%s/sec)",
+ state.Node.Name,
+ int(state.Duration.Hours()),
+ int(state.Duration.Minutes()),
+ int(state.Duration.Seconds()),
+ humanize.IBytes(uint64(state.RxBytes)),
+ humanize.IBytes(uint64(state.RxSpeed)),
+ humanize.IBytes(uint64(state.TxBytes)),
+ humanize.IBytes(uint64(state.TxSpeed)),
+ )
+ })
} else {
nodeId := "unknown"
if state.Node == nil {
nodeIdC <- state.Node.Id
nodeId = state.Node.Id.String()
}
- ctx.LogI("call-start", nncp.LEs{{K: "Node", V: nodeId}}, "connected")
+ ctx.LogI(
+ "call-started",
+ nncp.LEs{{K: "Node", V: nodeId}},
+ func(les nncp.LEs) string { return "Connected to " + state.Node.Name },
+ )
}
close(nodeIdC)
}
if err != nil {
log.Fatalln("Can not accept connection:", err)
}
- ctx.LogD("daemon", nncp.LEs{{K: "Addr", V: conn.RemoteAddr()}}, "accepted")
+ ctx.LogD(
+ "daemon-accepted",
+ nncp.LEs{{K: "Addr", V: conn.RemoteAddr()}},
+ func(les nncp.LEs) string {
+ return "Accepted connection with " + conn.RemoteAddr().String()
+ },
+ )
go func(conn net.Conn) {
nodeIdC := make(chan *nncp.NodeId)
go performSP(ctx, conn, nice, *noCK, nodeIdC)
"log"
"os"
- "go.cypherpunks.ru/nncp/v5"
+ "go.cypherpunks.ru/nncp/v6"
)
func usage() {
"os"
"strings"
- "go.cypherpunks.ru/nncp/v5"
+ "go.cypherpunks.ru/nncp/v6"
)
func usage() {
"path/filepath"
"strings"
- "go.cypherpunks.ru/nncp/v5"
+ "go.cypherpunks.ru/nncp/v6"
)
func usage() {
"log"
"os"
- "go.cypherpunks.ru/nncp/v5"
+ "go.cypherpunks.ru/nncp/v6"
"go.cypherpunks.ru/recfile"
)
xdr "github.com/davecgh/go-xdr/xdr2"
"github.com/klauspost/compress/zstd"
- "go.cypherpunks.ru/nncp/v5"
+ "go.cypherpunks.ru/nncp/v6"
)
func usage() {
xdr "github.com/davecgh/go-xdr/xdr2"
"github.com/dustin/go-humanize"
- "go.cypherpunks.ru/nncp/v5"
+ "go.cypherpunks.ru/nncp/v6"
"golang.org/x/crypto/blake2b"
)
}
var metaPkt nncp.ChunkedMeta
les := nncp.LEs{{K: "Path", V: path}}
+ logMsg := func(les nncp.LEs) string {
+ return fmt.Sprintf("Reassembling chunked file \"%s\"", path)
+ }
if _, err = xdr.Unmarshal(fd, &metaPkt); err != nil {
- ctx.LogE("nncp-reass", les, err, "bad meta file")
+ ctx.LogE("reass-bad-meta", les, err, func(les nncp.LEs) string {
+ return logMsg(les) + ": bad meta"
+ })
return false
}
fd.Close() // #nosec G104
if metaPkt.Magic != nncp.MagicNNCPMv1 {
- ctx.LogE("nncp-reass", les, nncp.BadMagic, "")
+ ctx.LogE("reass", les, nncp.BadMagic, logMsg)
return false
}
metaName := filepath.Base(path)
if !strings.HasSuffix(metaName, nncp.ChunkedSuffixMeta) {
- ctx.LogE("nncp-reass", les, errors.New("invalid filename suffix"), "")
+ ctx.LogE("reass", les, errors.New("invalid filename suffix"), logMsg)
return false
}
mainName := strings.TrimSuffix(metaName, nncp.ChunkedSuffixMeta)
fi, err := os.Stat(chunkPath)
lesChunk := append(les, nncp.LE{K: "Chunk", V: chunkNum})
if err != nil && os.IsNotExist(err) {
- ctx.LogI("nncp-reass", lesChunk, "missing")
+ ctx.LogI("reass-chunk-miss", lesChunk, func(les nncp.LEs) string {
+ return fmt.Sprintf("%s: chunk %d missing", logMsg(les), chunkNum)
+ })
allChunksExist = false
continue
}
badSize = uint64(fi.Size()) != metaPkt.ChunkSize
}
if badSize {
- ctx.LogE("nncp-reass", lesChunk, errors.New("invalid size"), "")
+ ctx.LogE(
+ "reass-chunk",
+ lesChunk,
+ errors.New("invalid size"),
+ func(les nncp.LEs) string {
+ return fmt.Sprintf("%s: chunk %d", logMsg(les), chunkNum)
+ },
+ )
allChunksExist = false
}
}
}
if _, err = nncp.CopyProgressed(
hsh, bufio.NewReader(fd), "check",
- nncp.LEs{
- {K: "Pkt", V: chunkPath},
- {K: "FullSize", V: fi.Size()},
- },
+ nncp.LEs{{K: "Pkt", V: chunkPath}, {K: "FullSize", V: fi.Size()}},
ctx.ShowPrgrs,
); err != nil {
log.Fatalln(err)
fd.Close() // #nosec G104
if bytes.Compare(hsh.Sum(nil), metaPkt.Checksums[chunkNum][:]) != 0 {
ctx.LogE(
- "nncp-reass",
+ "reass-chunk",
nncp.LEs{{K: "Path", V: path}, {K: "Chunk", V: chunkNum}},
- errors.New("checksum is bad"), "",
+ errors.New("checksum is bad"),
+ func(les nncp.LEs) string {
+ return fmt.Sprintf("%s: chunk %d", logMsg(les), chunkNum)
+ },
)
allChecksumsGood = false
}
return false
}
if dryRun {
- ctx.LogI("nncp-reass", nncp.LEs{{K: "path", V: path}}, "ready")
+ ctx.LogI("reass", nncp.LEs{{K: "path", V: path}}, logMsg)
return true
}
log.Fatalln(err)
}
les = nncp.LEs{{K: "path", V: path}, {K: "Tmp", V: tmp.Name()}}
- ctx.LogD("nncp-reass", les, "created")
+ ctx.LogD("reass-tmp-created", les, func(les nncp.LEs) string {
+ return fmt.Sprintf("%s: temporary %s created", logMsg(les), tmp.Name())
+ })
dst = tmp
}
dstW := bufio.NewWriter(dst)
}
if _, err = nncp.CopyProgressed(
dstW, bufio.NewReader(fd), "reass",
- nncp.LEs{
- {K: "Pkt", V: chunkPath},
- {K: "FullSize", V: fi.Size()},
- },
+ nncp.LEs{{K: "Pkt", V: chunkPath}, {K: "FullSize", V: fi.Size()}},
ctx.ShowPrgrs,
); err != nil {
log.Fatalln(err)
fd.Close() // #nosec G104
if !keep {
if err = os.Remove(chunkPath); err != nil {
- ctx.LogE("nncp-reass", append(les, nncp.LE{K: "Chunk", V: chunkNum}), err, "")
+ ctx.LogE(
+ "reass-chunk",
+ append(les, nncp.LE{K: "Chunk", V: chunkNum}), err,
+ func(les nncp.LEs) string {
+ return fmt.Sprintf("%s: chunk %d", logMsg(les), chunkNum)
+ },
+ )
hasErrors = true
}
}
log.Fatalln("Can not close:", err)
}
}
- ctx.LogD("nncp-reass", les, "written")
+ ctx.LogD("reass-written", les, func(les nncp.LEs) string {
+ return logMsg(les) + ": written"
+ })
if !keep {
if err = os.Remove(path); err != nil {
- ctx.LogE("nncp-reass", les, err, "")
+ ctx.LogE("reass-removing", les, err, func(les nncp.LEs) string {
+ return logMsg(les) + ": removing"
+ })
hasErrors = true
}
}
if stdout {
- ctx.LogI("nncp-reass", nncp.LEs{{K: "Path", V: path}}, "done")
+ ctx.LogI("reass", nncp.LEs{{K: "Path", V: path}}, func(les nncp.LEs) string {
+ return logMsg(les) + ": done"
+ })
return !hasErrors
}
if err = nncp.DirSync(mainDir); err != nil {
log.Fatalln(err)
}
- ctx.LogI("nncp-reass", nncp.LEs{{K: "Path", V: path}}, "done")
+ ctx.LogI("reass", nncp.LEs{{K: "Path", V: path}}, func(les nncp.LEs) string {
+ return logMsg(les) + ": done"
+ })
return !hasErrors
}
func findMetas(ctx *nncp.Ctx, dirPath string) []string {
dir, err := os.Open(dirPath)
defer dir.Close()
+ logMsg := func(les nncp.LEs) string {
+ return "Finding .meta in " + dirPath
+ }
if err != nil {
- ctx.LogE("nncp-reass", nncp.LEs{{K: "Path", V: dirPath}}, err, "")
+ ctx.LogE("reass", nncp.LEs{{K: "Path", V: dirPath}}, err, logMsg)
return nil
}
fis, err := dir.Readdir(0)
dir.Close() // #nosec G104
if err != nil {
- ctx.LogE("nncp-reass", nncp.LEs{{K: "Path", V: dirPath}}, err, "")
+ ctx.LogE("reass", nncp.LEs{{K: "Path", V: dirPath}}, err, logMsg)
return nil
}
metaPaths := make([]string, 0)
"strings"
"time"
- "go.cypherpunks.ru/nncp/v5"
+ "go.cypherpunks.ru/nncp/v6"
)
func usage() {
return nil
}
if now.Sub(info.ModTime()) < oldBoundary {
- ctx.LogD("nncp-rm", nncp.LEs{{K: "File", V: path}}, "too fresh, skipping")
+ ctx.LogD("rm-skip", nncp.LEs{{K: "File", V: path}}, func(les nncp.LEs) string {
+ return fmt.Sprintf("File %s: too fresh, skipping", path)
+ })
return nil
}
- ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "")
+ ctx.LogI("rm", nncp.LEs{{K: "File", V: path}}, func(les nncp.LEs) string {
+ return fmt.Sprintf("File %s: removed", path)
+ })
if *dryRun {
return nil
}
return nil
}
if strings.HasSuffix(info.Name(), ".lock") {
- ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "")
+ ctx.LogI("rm", nncp.LEs{{K: "File", V: path}}, func(les nncp.LEs) string {
+ return fmt.Sprintf("File %s: removed", path)
+ })
if *dryRun {
return nil
}
if info.IsDir() {
return nil
}
+ logMsg := func(les nncp.LEs) string {
+ return fmt.Sprintf("File %s: removed", path)
+ }
if now.Sub(info.ModTime()) < oldBoundary {
- ctx.LogD("nncp-rm", nncp.LEs{{K: "File", V: path}}, "too fresh, skipping")
+ ctx.LogD("rm-skip", nncp.LEs{{K: "File", V: path}}, func(les nncp.LEs) string {
+ return fmt.Sprintf("File %s: too fresh, skipping", path)
+ })
return nil
}
if (*doSeen && strings.HasSuffix(info.Name(), nncp.SeenSuffix)) ||
(*doNoCK && strings.HasSuffix(info.Name(), nncp.NoCKSuffix)) ||
(*doHdr && strings.HasSuffix(info.Name(), nncp.HdrSuffix)) ||
(*doPart && strings.HasSuffix(info.Name(), nncp.PartSuffix)) {
- ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "")
+ ctx.LogI("rm", nncp.LEs{{K: "File", V: path}}, logMsg)
if *dryRun {
return nil
}
return os.Remove(path)
}
if *pktRaw != "" && filepath.Base(info.Name()) == *pktRaw {
- ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "")
+ ctx.LogI("rm", nncp.LEs{{K: "File", V: path}}, logMsg)
if *dryRun {
return nil
}
if !*doSeen && !*doNoCK && !*doHdr && !*doPart &&
(*doRx || *doTx) &&
((*doRx && xx == nncp.TRx) || (*doTx && xx == nncp.TTx)) {
- ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "")
+ ctx.LogI("rm", nncp.LEs{{K: "File", V: path}}, logMsg)
if *dryRun {
return nil
}
"sort"
"github.com/dustin/go-humanize"
- "go.cypherpunks.ru/nncp/v5"
+ "go.cypherpunks.ru/nncp/v6"
)
func usage() {
"os"
"time"
- "go.cypherpunks.ru/nncp/v5"
+ "go.cypherpunks.ru/nncp/v6"
)
func usage() {
"os"
"path/filepath"
- "go.cypherpunks.ru/nncp/v5"
+ "github.com/dustin/go-humanize"
+ "go.cypherpunks.ru/nncp/v6"
)
func usage() {
var dir *os.File
var fis []os.FileInfo
var les nncp.LEs
+ var logMsg func(les nncp.LEs) string
if *txOnly {
goto Tx
}
{K: "XX", V: string(nncp.TRx)},
{K: "Dir", V: selfPath},
}
- ctx.LogD("nncp-xfer", les, "self")
+ logMsg = func(les nncp.LEs) string {
+ return "Packet transfer, received from self"
+ }
+ ctx.LogD("xfer-self", les, logMsg)
if _, err = os.Stat(selfPath); err != nil {
if os.IsNotExist(err) {
- ctx.LogD("nncp-xfer", les, "no dir")
+ ctx.LogD("xfer-self-no-dir", les, func(les nncp.LEs) string {
+ return logMsg(les) + ": no directory"
+ })
goto Tx
}
- ctx.LogE("nncp-xfer", les, err, "stat")
+ ctx.LogE("xfer-self-stat", les, err, func(les nncp.LEs) string {
+ return logMsg(les) + ": stating"
+ })
isBad = true
goto Tx
}
dir, err = os.Open(selfPath)
if err != nil {
- ctx.LogE("nncp-xfer", les, err, "open")
+ ctx.LogE("xfer-self-open", les, err, func(les nncp.LEs) string {
+ return logMsg(les) + ": opening"
+ })
isBad = true
goto Tx
}
fis, err = dir.Readdir(0)
dir.Close() // #nosec G104
if err != nil {
- ctx.LogE("nncp-xfer", les, err, "read")
+ ctx.LogE("xfer-self-read", les, err, func(les nncp.LEs) string {
+ return logMsg(les) + ": reading"
+ })
isBad = true
goto Tx
}
}
nodeId, err := nncp.NodeIdFromString(fi.Name())
les := append(les, nncp.LE{K: "Node", V: fi.Name()})
+ logMsg := func(les nncp.LEs) string {
+ return "Packet transfer, received from " + ctx.NodeName(nodeId)
+ }
if err != nil {
- ctx.LogD("nncp-xfer", les, "is not NodeId")
+ ctx.LogD("xfer-rx-not-node", les, func(les nncp.LEs) string {
+ return logMsg(les) + ": is not NodeId"
+ })
continue
}
if nodeOnly != nil && *nodeId != *nodeOnly.Id {
- ctx.LogD("nncp-xfer", les, "skip")
+ ctx.LogD("xfer-rx-skip", les, func(les nncp.LEs) string {
+ return logMsg(les) + ": skipping"
+ })
continue
}
if _, known := ctx.Neigh[*nodeId]; !known {
- ctx.LogD("nncp-xfer", les, "unknown")
+ ctx.LogD("xfer-rx-unknown", les, func(les nncp.LEs) string {
+ return logMsg(les) + ": unknown"
+ })
continue
}
dir, err = os.Open(filepath.Join(selfPath, fi.Name()))
if err != nil {
- ctx.LogE("nncp-xfer", les, err, "open")
+ ctx.LogE("xfer-rx-open", les, err, func(les nncp.LEs) string {
+ return logMsg(les) + ": opening"
+ })
isBad = true
continue
}
fisInt, err := dir.Readdir(0)
dir.Close() // #nosec G104
if err != nil {
- ctx.LogE("nncp-xfer", les, err, "read")
+ ctx.LogE("xfer-rx-read", les, err, func(les nncp.LEs) string {
+ return logMsg(les) + ": reading"
+ })
isBad = true
continue
}
}
filename := filepath.Join(dir.Name(), fiInt.Name())
les := append(les, nncp.LE{K: "File", V: filename})
+ logMsg := func(les nncp.LEs) string {
+ return fmt.Sprintf(
+ "Packet transfer, received from %s: %s",
+ ctx.NodeName(nodeId), filename,
+ )
+ }
fd, err := os.Open(filename)
if err != nil {
- ctx.LogE("nncp-xfer", les, err, "open")
+ ctx.LogE("xfer-rx-open", les, err, func(les nncp.LEs) string {
+ return logMsg(les) + ": opening"
+ })
isBad = true
continue
}
pktEnc, pktEncRaw, err := ctx.HdrRead(fd)
if err != nil || pktEnc.Magic != nncp.MagicNNCPEv4 {
- ctx.LogD("nncp-xfer", les, "is not a packet")
+ ctx.LogD("xfer-rx-not-packet", les, func(les nncp.LEs) string {
+ return logMsg(les) + ": is not a packet"
+ })
fd.Close() // #nosec G104
continue
}
if pktEnc.Nice > nice {
- ctx.LogD("nncp-xfer", les, "too nice")
+ ctx.LogD("xfer-rx-too-nice", les, func(les nncp.LEs) string {
+ return logMsg(les) + ": too nice"
+ })
fd.Close() // #nosec G104
continue
}
les = append(les, nncp.LE{K: "Size", V: fiInt.Size()})
+ logMsg = func(les nncp.LEs) string {
+ return fmt.Sprintf(
+ "Packet transfer, received from %s: %s (%s)",
+ ctx.NodeName(nodeId), filename,
+ humanize.IBytes(uint64(fiInt.Size())),
+ )
+ }
if !ctx.IsEnoughSpace(fiInt.Size()) {
- ctx.LogE("nncp-xfer", les, errors.New("is not enough space"), "")
+ ctx.LogE("xfer-rx", les, errors.New("is not enough space"), logMsg)
fd.Close() // #nosec G104
continue
}
err = w.Close()
}
if err != nil {
- ctx.LogE("nncp-xfer", les, err, "copy")
+ ctx.LogE("xfer-rx", les, err, logMsg)
w.CloseWithError(err) // #nosec G104
}
}()
if _, err = nncp.CopyProgressed(
tmp.W, r, "Rx",
- append(les, nncp.LEs{
- {K: "Pkt", V: filename},
- {K: "FullSize", V: fiInt.Size()},
- }...),
+ append(
+ les,
+ nncp.LE{K: "Pkt", V: filename},
+ nncp.LE{K: "FullSize", V: fiInt.Size()},
+ ),
ctx.ShowPrgrs,
); err != nil {
- ctx.LogE("nncp-xfer", les, err, "copy")
+ ctx.LogE("xfer-rx", les, err, logMsg)
isBad = true
}
fd.Close() // #nosec G104
)); err != nil {
log.Fatalln(err)
}
- ctx.LogI("nncp-xfer", les, "")
+ ctx.LogI("xfer-rx", les, logMsg)
if !*keep {
if err = os.Remove(filename); err != nil {
- ctx.LogE("nncp-xfer", les, err, "remove")
+ ctx.LogE("xfer-rx-remove", les, err, logMsg)
isBad = true
}
}
return
}
for nodeId := range ctx.Neigh {
- les := nncp.LEs{
- {K: "XX", V: string(nncp.TTx)},
- {K: "Node", V: nodeId},
+ les := nncp.LEs{{K: "XX", V: string(nncp.TTx)}, {K: "Node", V: nodeId}}
+ logMsg := func(les nncp.LEs) string {
+ return "Packet transfer, sent to " + ctx.NodeName(&nodeId)
}
if nodeOnly != nil && nodeId != *nodeOnly.Id {
- ctx.LogD("nncp-xfer", les, "skip")
+ ctx.LogD("xfer-tx-skip", les, func(les nncp.LEs) string {
+ return logMsg(les) + ": skipping"
+ })
continue
}
dirLock, err := ctx.LockDir(&nodeId, string(nncp.TTx))
}
nodePath := filepath.Join(flag.Arg(0), nodeId.String())
les = append(les, nncp.LE{K: "Dir", V: nodePath})
+ logMsg = func(les nncp.LEs) string {
+ return fmt.Sprintf(
+ "Packet transfer, sent to %s: directory %s",
+ ctx.NodeName(&nodeId), nodePath,
+ )
+ }
_, err = os.Stat(nodePath)
if err != nil {
if os.IsNotExist(err) {
- ctx.LogD("nncp-xfer", les, "does not exist")
+ ctx.LogD("xfer-tx-not-exist", les, func(les nncp.LEs) string {
+ return logMsg(les) + ": does not exist"
+ })
if !*mkdir {
ctx.UnlockDir(dirLock)
continue
}
if err = os.Mkdir(nodePath, os.FileMode(0777)); err != nil {
ctx.UnlockDir(dirLock)
- ctx.LogE("nncp-xfer", les, err, "mkdir")
+ ctx.LogE("xfer-tx-mkdir", les, err, logMsg)
isBad = true
continue
}
} else {
ctx.UnlockDir(dirLock)
- ctx.LogE("nncp-xfer", les, err, "stat")
+ ctx.LogE("xfer-tx", les, err, logMsg)
isBad = true
continue
}
}
dstPath := filepath.Join(nodePath, ctx.SelfId.String())
les[len(les)-1].V = dstPath
+ logMsg = func(les nncp.LEs) string {
+ return fmt.Sprintf(
+ "Packet transfer, sent to %s: directory %s",
+ ctx.NodeName(&nodeId), dstPath,
+ )
+ }
_, err = os.Stat(dstPath)
if err != nil {
if os.IsNotExist(err) {
if err = os.Mkdir(dstPath, os.FileMode(0777)); err != nil {
ctx.UnlockDir(dirLock)
- ctx.LogE("nncp-xfer", les, err, "mkdir")
+ ctx.LogE("xfer-tx-mkdir", les, err, logMsg)
isBad = true
continue
}
} else {
ctx.UnlockDir(dirLock)
- ctx.LogE("nncp-xfer", les, err, "stat")
+ ctx.LogE("xfer-tx", les, err, logMsg)
isBad = true
continue
}
for job := range ctx.Jobs(&nodeId, nncp.TTx) {
pktName := filepath.Base(job.Path)
les := append(les, nncp.LE{K: "Pkt", V: pktName})
+ logMsg = func(les nncp.LEs) string {
+ return fmt.Sprintf(
+ "Packet transfer, sent to %s: %s",
+ ctx.NodeName(&nodeId), pktName,
+ )
+ }
if job.PktEnc.Nice > nice {
- ctx.LogD("nncp-xfer", les, "too nice")
+ ctx.LogD("xfer-tx-too-nice", les, func(les nncp.LEs) string {
+ return logMsg(les) + ": too nice"
+ })
continue
}
if _, err = os.Stat(filepath.Join(dstPath, pktName)); err == nil || !os.IsNotExist(err) {
- ctx.LogD("nncp-xfer", les, "already exists")
+ ctx.LogD("xfer-tx-exists", les, func(les nncp.LEs) string {
+ return logMsg(les) + ": already exists"
+ })
continue
}
if _, err = os.Stat(filepath.Join(dstPath, pktName+nncp.SeenSuffix)); err == nil || !os.IsNotExist(err) {
- ctx.LogD("nncp-xfer", les, "already exists")
+ ctx.LogD("xfer-tx-seen", les, func(les nncp.LEs) string {
+ return logMsg(les) + ": already seen"
+ })
continue
}
tmp, err := nncp.TempFile(dstPath, "xfer")
if err != nil {
- ctx.LogE("nncp-xfer", les, err, "mktemp")
+ ctx.LogE("xfer-tx-mktemp", les, err, func(les nncp.LEs) string {
+ return logMsg(les) + ": mktemp"
+ })
isBad = true
break
}
les = append(les, nncp.LE{K: "Tmp", V: tmp.Name()})
- ctx.LogD("nncp-xfer", les, "created")
+ ctx.LogD("xfer-tx-tmp-create", les, func(les nncp.LEs) string {
+ return fmt.Sprintf("%s: temporary %s created", logMsg(les), tmp.Name())
+ })
fd, err := os.Open(job.Path)
if err != nil {
- ctx.LogE("nncp-xfer", les, err, "open")
+ ctx.LogE("xfer-tx-open", les, err, func(les nncp.LEs) string {
+ return logMsg(les) + ": opening"
+ })
tmp.Close() // #nosec G104
isBad = true
continue
)
fd.Close() // #nosec G104
if err != nil {
- ctx.LogE("nncp-xfer", les, err, "copy")
+ ctx.LogE("xfer-tx-copy", les, err, func(les nncp.LEs) string {
+ return logMsg(les) + ": copying"
+ })
tmp.Close() // #nosec G104
isBad = true
continue
}
if err = bufW.Flush(); err != nil {
tmp.Close() // #nosec G104
- ctx.LogE("nncp-xfer", les, err, "flush")
+ ctx.LogE("xfer-tx-flush", les, err, func(les nncp.LEs) string {
+ return logMsg(les) + ": flushing"
+ })
isBad = true
continue
}
if err = tmp.Sync(); err != nil {
tmp.Close() // #nosec G104
- ctx.LogE("nncp-xfer", les, err, "sync")
+ ctx.LogE("xfer-tx-sync", les, err, func(les nncp.LEs) string {
+ return logMsg(les) + ": syncing"
+ })
isBad = true
continue
}
if err = tmp.Close(); err != nil {
- ctx.LogE("nncp-xfer", les, err, "sync")
+ ctx.LogE("xfer-tx-close", les, err, func(les nncp.LEs) string {
+ return logMsg(les) + ": closing"
+ })
}
if err = os.Rename(tmp.Name(), filepath.Join(dstPath, pktName)); err != nil {
- ctx.LogE("nncp-xfer", les, err, "rename")
+ ctx.LogE("xfer-tx-rename", les, err, func(les nncp.LEs) string {
+ return logMsg(les) + ": renaming"
+ })
isBad = true
continue
}
if err = nncp.DirSync(dstPath); err != nil {
- ctx.LogE("nncp-xfer", les, err, "sync")
+ ctx.LogE("xfer-tx-dirsync", les, err, func(les nncp.LEs) string {
+ return logMsg(les) + ": dirsyncing"
+ })
isBad = true
continue
}
os.Remove(filepath.Join(dstPath, pktName+".part")) // #nosec G104
les = les[:len(les)-1]
- ctx.LogI("nncp-xfer", append(les, nncp.LE{K: "Size", V: copied}), "")
+ ctx.LogI(
+ "xfer-tx",
+ append(les, nncp.LE{K: "Size", V: copied}),
+ func(les nncp.LEs) string {
+ return fmt.Sprintf(
+ "%s (%s)", logMsg(les), humanize.IBytes(uint64(copied)),
+ )
+ },
+ )
if !*keep {
if err = os.Remove(job.Path); err != nil {
- ctx.LogE("nncp-xfer", les, err, "remove")
+ ctx.LogE("xfer-tx-remove", les, err, func(les nncp.LEs) string {
+ return logMsg(les) + ": removing"
+ })
isBad = true
} else if ctx.HdrUsage {
os.Remove(job.Path + nncp.HdrSuffix)
import (
"errors"
+ "fmt"
"io/ioutil"
"log"
"os"
func (ctx *Ctx) ensureRxDir(nodeId *NodeId) error {
dirPath := filepath.Join(ctx.Spool, nodeId.String(), string(TRx))
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf("Ensuring directory %s existence", dirPath)
+ }
if err := os.MkdirAll(dirPath, os.FileMode(0777)); err != nil {
- ctx.LogE("dir-ensure", LEs{{"Dir", dirPath}}, err, "")
+ ctx.LogE("dir-ensure-mkdir", LEs{{"Dir", dirPath}}, err, logMsg)
return err
}
fd, err := os.Open(dirPath)
if err != nil {
- ctx.LogE("dir-ensure", LEs{{"Dir", dirPath}}, err, "")
+ ctx.LogE("dir-ensure-open", LEs{{"Dir", dirPath}}, err, logMsg)
return err
}
return fd.Close()
-module go.cypherpunks.ru/nncp/v5
+module go.cypherpunks.ru/nncp/v6
require (
github.com/davecgh/go-xdr v0.0.0-20161123171359-e6a2ba005892
package nncp
import (
- "errors"
"fmt"
- "strconv"
"strings"
"time"
- "github.com/dustin/go-humanize"
"go.cypherpunks.ru/recfile"
)
+func (ctx *Ctx) NodeName(id *NodeId) string {
+ idS := id.String()
+ node, err := ctx.FindNode(idS)
+ if err == nil {
+ return node.Name
+ }
+ return idS
+}
+
func (ctx *Ctx) HumanizeRec(rec string) string {
r := recfile.NewReader(strings.NewReader(rec))
le, err := r.NextMap()
}
func (ctx *Ctx) Humanize(le map[string]string) (string, error) {
- nodeS := le["Node"]
- node, err := ctx.FindNode(nodeS)
- if err == nil {
- nodeS = node.Name
- }
- var sizeParsed uint64
- var size string
- if sizeRaw, exists := le["Size"]; exists {
- sizeParsed, err = strconv.ParseUint(sizeRaw, 10, 64)
- if err != nil {
- return "", err
- }
- size = humanize.IBytes(sizeParsed)
- }
-
- var msg string
- switch le["Who"] {
- case "tx":
- switch le["Type"] {
- case "file":
- msg = fmt.Sprintf(
- "File %s (%s) transfer to %s:%s: %s",
- le["Src"], size, nodeS, le["Dst"], le["Msg"],
- )
- case "freq":
- msg = fmt.Sprintf(
- "File request from %s:%s to %s: %s",
- nodeS, le["Src"], le["Dst"], le["Msg"],
- )
- case "exec":
- msg = fmt.Sprintf(
- "Exec to %s@%s (%s): %s",
- nodeS, le["Dst"], size, le["Msg"],
- )
- case "trns":
- msg = fmt.Sprintf(
- "Transitional packet to %s (%s) (nice %s): %s",
- nodeS, size, le["Nice"], le["Msg"],
- )
- default:
- return "", errors.New("unknown \"tx\" type")
- }
- if err, exists := le["Err"]; exists {
- msg += ": " + err
- }
- case "rx":
- switch le["Type"] {
- case "exec":
- msg = fmt.Sprintf("Got exec from %s to %s (%s)", nodeS, le["Dst"], size)
- case "file":
- msg = fmt.Sprintf("Got file %s (%s) from %s", le["Dst"], size, nodeS)
- case "freq":
- msg = fmt.Sprintf("Got file request %s to %s", le["Src"], nodeS)
- case "trns":
- nodeT := le["Dst"]
- node, err := ctx.FindNode(nodeT)
- if err == nil {
- nodeT = node.Name
- }
- msg = fmt.Sprintf(
- "Got transitional packet from %s to %s (%s)",
- nodeS, nodeT, size,
- )
- default:
- return "", errors.New("unknown \"rx\" type")
- }
- if err, exists := le["Err"]; exists {
- msg += ": " + err
- }
- case "check":
- msg = fmt.Sprintf("Checking: %s/%s/%s", le["Node"], le["XX"], le["Pkt"])
- if err, exists := le["Err"]; exists {
- msg += fmt.Sprintf(" %s", err)
- }
- case "nncp-xfer":
- switch le["XX"] {
- case "rx":
- msg = "Packet transfer, received from"
- case "tx":
- msg = "Packet transfer, sent to"
- default:
- return "", errors.New("unknown XX")
- }
- if nodeS != "" {
- msg += " node " + nodeS
- }
- if size != "" {
- msg += fmt.Sprintf(" (%s)", size)
- }
- if err, exists := le["Err"]; exists {
- msg += ": " + err
- } else {
- msg += " " + le["Msg"]
- }
- case "nncp-bundle":
- switch le["XX"] {
- case "rx":
- msg = "Bundle transfer, received from"
- case "tx":
- msg = "Bundle transfer, sent to"
- default:
- return "", errors.New("unknown XX")
- }
- if nodeS != "" {
- msg += " node " + nodeS
- }
- msg += " " + le["Pkt"]
- if size != "" {
- msg += fmt.Sprintf(" (%s)", size)
- }
- if err, exists := le["Err"]; exists {
- msg += ": " + err
- }
- case "nncp-rm":
- msg += "removing " + le["File"]
- case "call-start":
- msg = fmt.Sprintf("Connection to %s", nodeS)
- if err, exists := le["Err"]; exists {
- msg += ": " + err
- }
- case "call-finish":
- rx, err := strconv.ParseUint(le["RxBytes"], 10, 64)
- if err != nil {
- return "", err
- }
- rxs, err := strconv.ParseUint(le["RxSpeed"], 10, 64)
- if err != nil {
- return "", err
- }
- tx, err := strconv.ParseUint(le["TxBytes"], 10, 64)
- if err != nil {
- return "", err
- }
- txs, err := strconv.ParseUint(le["TxSpeed"], 10, 64)
- if err != nil {
- return "", err
- }
- msg = fmt.Sprintf(
- "Finished call with %s: %s received (%s/sec), %s transferred (%s/sec)",
- nodeS,
- humanize.IBytes(uint64(rx)), humanize.IBytes(uint64(rxs)),
- humanize.IBytes(uint64(tx)), humanize.IBytes(uint64(txs)),
- )
- case "sp-start":
- if nodeS == "" {
- msg += "SP"
- if peer, exists := le["Peer"]; exists {
- msg += fmt.Sprintf(": %s", peer)
- }
- } else {
- nice, err := NicenessParse(le["Nice"])
- if err != nil {
- return "", err
- }
- msg += fmt.Sprintf("SP with %s (nice %s)", nodeS, NicenessFmt(nice))
- }
- if m, exists := le["Msg"]; exists {
- msg += ": " + m
- }
- if err, exists := le["Err"]; exists {
- msg += ": " + err
- }
- case "sp-info":
- nice, err := NicenessParse(le["Nice"])
- if err != nil {
- return "", err
- }
- msg = fmt.Sprintf(
- "Packet %s (%s) (nice %s)",
- le["Pkt"], size, NicenessFmt(nice),
- )
- if offset := le["Offset"]; offset != "" {
- offsetParsed, err := strconv.ParseUint(offset, 10, 64)
- if err != nil {
- return "", err
- }
- msg += fmt.Sprintf(": %d%%", 100*offsetParsed/sizeParsed)
- }
- if m, exists := le["Msg"]; exists {
- msg += ": " + m
- }
- case "sp-infos":
- switch le["XX"] {
- case "rx":
- msg = fmt.Sprintf("%s has got for us: ", nodeS)
- case "tx":
- msg = fmt.Sprintf("We have got for %s: ", nodeS)
- default:
- return "", errors.New("unknown XX")
- }
- msg += fmt.Sprintf("%s packets, %s", le["Pkts"], size)
- case "sp-process":
- msg = fmt.Sprintf("%s has %s (%s): %s", nodeS, le["Pkt"], size, le["Msg"])
- case "sp-file":
- switch le["XX"] {
- case "rx":
- msg = "Got packet "
- case "tx":
- msg = "Sent packet "
- default:
- return "", errors.New("unknown XX")
- }
- fullsize, err := strconv.ParseUint(le["FullSize"], 10, 64)
- if err != nil {
- return "", err
- }
- msg += fmt.Sprintf(
- "%s %d%% (%s / %s)",
- le["Pkt"],
- 100*sizeParsed/fullsize,
- humanize.IBytes(uint64(sizeParsed)),
- humanize.IBytes(uint64(fullsize)),
- )
- case "sp-done":
- switch le["XX"] {
- case "rx":
- msg = fmt.Sprintf("Packet %s is retreived (%s)", le["Pkt"], size)
- case "tx":
- msg = fmt.Sprintf("Packet %s is sent", le["Pkt"])
- default:
- return "", errors.New("unknown XX")
- }
- case "nncp-reass":
- chunkNum, exists := le["Chunk"]
- if exists {
- msg = fmt.Sprintf(
- "Reassembling chunked file \"%s\" (chunk %s): %s",
- le["Path"], chunkNum, le["Msg"],
- )
- } else {
- msg = fmt.Sprintf(
- "Reassembling chunked file \"%s\": %s",
- le["Path"], le["Msg"],
- )
- }
- if err, exists := le["Err"]; exists {
- msg += ": " + err
- }
- case "lockdir":
- msg = fmt.Sprintf("Acquire lock for %s: %s", le["Path"], le["Err"])
- default:
- return "", errors.New("unknown Who")
- }
when, err := time.Parse(time.RFC3339Nano, le["When"])
if err != nil {
return "", err
}
var level string
- if _, isErr := le["Err"]; isErr {
+ msg := le["Msg"]
+ if errMsg, isErr := le["Err"]; isErr {
level = "ERROR "
+ msg += ": " + errMsg
}
return fmt.Sprintf("%s %s%s", when.Format(time.RFC3339), level, msg), nil
}
import (
"bytes"
+ "fmt"
"os"
"path/filepath"
"strings"
xdr "github.com/davecgh/go-xdr/xdr2"
+ "github.com/dustin/go-humanize"
)
type TRxTx string
func (ctx *Ctx) HdrWrite(pktEncRaw []byte, tgt string) error {
tmpHdr, err := ctx.NewTmpFile()
if err != nil {
- ctx.LogE("hdr-write", []LE{}, err, "new")
+ ctx.LogE("hdr-write-tmp-new", nil, err, func(les LEs) string {
+ return "Header writing: new temporary file"
+ })
return err
}
if _, err = tmpHdr.Write(pktEncRaw); err != nil {
- ctx.LogE("hdr-write", []LE{}, err, "write")
+ ctx.LogE("hdr-write-write", nil, err, func(les LEs) string {
+ return "Header writing: writing"
+ })
os.Remove(tmpHdr.Name())
return err
}
if err = tmpHdr.Close(); err != nil {
- ctx.LogE("hdr-write", []LE{}, err, "close")
+ ctx.LogE("hdr-write-close", nil, err, func(les LEs) string {
+ return "Header writing: closing"
+ })
os.Remove(tmpHdr.Name())
return err
}
if err = os.Rename(tmpHdr.Name(), tgt+HdrSuffix); err != nil {
- ctx.LogE("hdr-write", []LE{}, err, "rename")
+ ctx.LogE("hdr-write-rename", nil, err, func(les LEs) string {
+ return "Header writing: renaming"
+ })
return err
}
return err
if err != nil || pktEnc.Magic != MagicNNCPEv4 {
continue
}
- ctx.LogD("jobs", LEs{
+ ctx.LogD("job", LEs{
{"XX", string(xx)},
{"Node", pktEnc.Sender},
{"Name", name},
{"Nice", int(pktEnc.Nice)},
{"Size", fi.Size()},
- }, "taken")
+ }, func(les LEs) string {
+ return fmt.Sprintf(
+ "Job %s/%s/%s nice: %s size: %s",
+ pktEnc.Sender, string(xx), name,
+ NicenessFmt(pktEnc.Nice),
+ humanize.IBytes(uint64(fi.Size())),
+ )
+ })
if !hdrExists && ctx.HdrUsage {
ctx.HdrWrite(pktEncRaw, pth)
}
func (ctx *Ctx) LockDir(nodeId *NodeId, lockCtx string) (*os.File, error) {
if err := ctx.ensureRxDir(nodeId); err != nil {
- ctx.LogE("lockdir", LEs{}, err, "")
return nil, err
}
lockPath := filepath.Join(ctx.Spool, nodeId.String(), lockCtx) + ".lock"
os.FileMode(0666),
)
if err != nil {
- ctx.LogE("lockdir", LEs{{"path", lockPath}}, err, "")
+ ctx.LogE("lockdir-open", LEs{{"Path", lockPath}}, err, func(les LEs) string {
+ return "Locking directory: opening %s" + lockPath
+ })
return nil, err
}
err = unix.Flock(int(dirLock.Fd()), unix.LOCK_EX|unix.LOCK_NB)
if err != nil {
- ctx.LogE("lockdir", LEs{{"path", lockPath}}, err, "")
+ ctx.LogE("lockdir-flock", LEs{{"Path", lockPath}}, err, func(les LEs) string {
+ return "Locking directory: locking %s" + lockPath
+ })
dirLock.Close() // #nosec G104
return nil, err
}
fd.Close() // #nosec G104
}
-func (ctx *Ctx) LogD(who string, les LEs, msg string) {
+func (ctx *Ctx) LogD(who string, les LEs, msg func(LEs) string) {
if !ctx.Debug {
return
}
les = append(LEs{{"Debug", true}, {"Who", who}}, les...)
- if msg != "" {
- les = append(les, LE{"Msg", msg})
- }
+ les = append(les, LE{"Msg", msg(les)})
fmt.Fprint(os.Stderr, les.Rec())
}
-func (ctx *Ctx) LogI(who string, les LEs, msg string) {
+func (ctx *Ctx) LogI(who string, les LEs, msg func(LEs) string) {
les = append(LEs{{"Who", who}}, les...)
- if msg != "" {
- les = append(les, LE{"Msg", msg})
- }
+ les = append(les, LE{"Msg", msg(les)})
rec := les.Rec()
if !ctx.Quiet {
fmt.Fprintln(os.Stderr, ctx.HumanizeRec(rec))
ctx.Log(rec)
}
-func (ctx *Ctx) LogE(who string, les LEs, err error, msg string) {
+func (ctx *Ctx) LogE(who string, les LEs, err error, msg func(LEs) string) {
les = append(LEs{{"Err", err.Error()}, {"Who", who}}, les...)
- if msg != "" {
- les = append(les, LE{"Msg", msg})
- }
+ les = append(les, LE{"Msg", msg(les)})
rec := les.Rec()
if !ctx.Quiet {
fmt.Fprintln(os.Stderr, ctx.HumanizeRec(rec))
}
return strconv.Itoa(int(nice))
}
+
+type ByNice []*SPInfo
+
+func (a ByNice) Len() int {
+ return len(a)
+}
+
+func (a ByNice) Swap(i, j int) {
+ a[i], a[j] = a[j], a[i]
+}
+
+func (a ByNice) Less(i, j int) bool {
+ return a[i].Nice < a[j].Nice
+}
const Base32Encoded32Len = 52
var (
- Version string = "6.1.0"
+ Version string = "6.2.0"
Base32Codec *base32.Encoding = base32.StdEncoding.WithPadding(base32.NoPadding)
)
"time"
"github.com/dustin/go-humanize"
- "go.cypherpunks.ru/nncp/v5/uilive"
+ "go.cypherpunks.ru/nncp/v6/uilive"
)
func init() {
+++ /dev/null
-package nncp
-
-type ByNice []*SPInfo
-
-func (a ByNice) Len() int {
- return len(a)
-}
-
-func (a ByNice) Swap(i, j int) {
- a[i], a[j] = a[j], a[i]
-}
-
-func (a ByNice) Less(i, j int) bool {
- return a[i].Nice < a[j].Nice
-}
"bytes"
"crypto/subtle"
"errors"
+ "fmt"
"hash"
"io"
"os"
"time"
xdr "github.com/davecgh/go-xdr/xdr2"
+ "github.com/dustin/go-humanize"
"github.com/flynn/noise"
"golang.org/x/crypto/blake2b"
)
func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) {
for hshValue := range appeared {
+ pktName := Base32Codec.EncodeToString(hshValue[:])
les := LEs{
{"XX", string(TRx)},
{"Node", nodeId},
- {"Pkt", Base32Codec.EncodeToString(hshValue[:])},
+ {"Pkt", pktName},
}
- ctx.LogD("sp-checker", les, "checking")
+ ctx.LogD("sp-checker", les, func(les LEs) string {
+ return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(nodeId), pktName)
+ })
size, err := ctx.CheckNoCK(nodeId, hshValue)
les = append(les, LE{"Size", size})
if err != nil {
- ctx.LogE("sp-checker", les, err, "")
+ ctx.LogE("sp-checker", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Checksumming %s/rx/%s (%s)", ctx.NodeName(nodeId), pktName,
+ humanize.IBytes(uint64(size)),
+ )
+ })
continue
}
- ctx.LogI("sp-done", les, "")
+ ctx.LogI("sp-checker-done", les, func(les LEs) string {
+ return fmt.Sprintf(
+ "Packet %s is retreived (%s)",
+ pktName, humanize.IBytes(uint64(size)),
+ )
+ })
go func(hsh *[32]byte) { checked <- hsh }(hshValue)
}
}
var payloads [][]byte
for _, info := range infos {
payloads = append(payloads, MarshalSP(SPTypeInfo, info))
+ pktName := Base32Codec.EncodeToString(info.Hash[:])
ctx.LogD("sp-info-our", LEs{
{"Node", nodeId},
- {"Name", Base32Codec.EncodeToString(info.Hash[:])},
+ {"Name", pktName},
{"Size", info.Size},
- }, "")
+ }, func(les LEs) string {
+ return fmt.Sprintf(
+ "Our info: %s/tx/%s (%s)",
+ ctx.NodeName(nodeId),
+ pktName,
+ humanize.IBytes(info.Size),
+ )
+ })
}
if totalSize > 0 {
- ctx.LogI("sp-infos", LEs{
+ ctx.LogI("sp-infos-tx", LEs{
{"XX", string(TTx)},
{"Node", nodeId},
{"Pkts", len(payloads)},
{"Size", totalSize},
- }, "")
+ }, func(les LEs) string {
+ return fmt.Sprintf(
+ "We have got for %s: %d packets, %s",
+ ctx.NodeName(nodeId),
+ len(payloads),
+ humanize.IBytes(uint64(totalSize)),
+ )
+ })
}
return payloadsSplit(payloads)
}
return err
}
les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}}
- state.Ctx.LogD("sp-start", les, "sending first message")
+ state.Ctx.LogD("sp-startI", les, func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): sending first message",
+ state.Node.Name,
+ NicenessFmt(state.Nice),
+ )
+ })
conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
if err = state.WriteSP(conn, buf, false); err != nil {
- state.Ctx.LogE("sp-start", les, err, "")
+ state.Ctx.LogE("sp-startI", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): writing",
+ state.Node.Name,
+ NicenessFmt(state.Nice),
+ )
+ })
state.dirUnlock()
return err
}
- state.Ctx.LogD("sp-start", les, "waiting for first message")
+ state.Ctx.LogD("sp-startI-wait", les, func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): waiting for first message",
+ state.Node.Name,
+ NicenessFmt(state.Nice),
+ )
+ })
conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
if buf, err = state.ReadSP(conn); err != nil {
- state.Ctx.LogE("sp-start", les, err, "")
+ state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): reading",
+ state.Node.Name,
+ NicenessFmt(state.Nice),
+ )
+ })
state.dirUnlock()
return err
}
payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
if err != nil {
- state.Ctx.LogE("sp-start", les, err, "")
+ state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): reading Noise message",
+ state.Node.Name,
+ NicenessFmt(state.Nice),
+ )
+ })
state.dirUnlock()
return err
}
- state.Ctx.LogD("sp-start", les, "starting workers")
+ state.Ctx.LogD("sp-startI-workers", les, func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): starting workers",
+ state.Node.Name,
+ NicenessFmt(state.Nice),
+ )
+ })
err = state.StartWorkers(conn, infosPayloads, payload)
if err != nil {
- state.Ctx.LogE("sp-start", les, err, "")
+ state.Ctx.LogE("sp-startI-workers", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): starting workers",
+ state.Node.Name,
+ NicenessFmt(state.Nice),
+ )
+ })
state.dirUnlock()
}
return err
var buf []byte
var payload []byte
- state.Ctx.LogD("sp-start", LEs{{"Nice", int(state.Nice)}}, "waiting for first message")
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "SP nice %s: waiting for first message",
+ NicenessFmt(state.Nice),
+ )
+ }
+ les := LEs{{"Nice", int(state.Nice)}}
+ state.Ctx.LogD("sp-startR", les, logMsg)
conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
if buf, err = state.ReadSP(conn); err != nil {
- state.Ctx.LogE("sp-start", LEs{}, err, "")
+ state.Ctx.LogE("sp-startR-read", les, err, logMsg)
return err
}
if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
- state.Ctx.LogE("sp-start", LEs{}, err, "")
+ state.Ctx.LogE("sp-startR-read", les, err, logMsg)
return err
}
}
if node == nil {
peerId := Base32Codec.EncodeToString(state.hs.PeerStatic())
- state.Ctx.LogE("sp-start", LEs{{"Peer", peerId}}, errors.New("unknown peer"), "")
- return errors.New("Unknown peer: " + peerId)
+ err = errors.New("unknown peer: " + peerId)
+ state.Ctx.LogE("sp-startR-unknown", append(les, LE{"Peer", peerId}), err, logMsg)
+ return err
}
state.Node = node
state.rxRate = node.RxRate
state.txRate = node.TxRate
state.onlineDeadline = node.OnlineDeadline
state.maxOnlineTime = node.MaxOnlineTime
- les := LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}}
+ les = LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}}
if err = state.Ctx.ensureRxDir(node.Id); err != nil {
return err
firstPayload = append(firstPayload, SPHaltMarshalized...)
}
- state.Ctx.LogD("sp-start", les, "sending first message")
+ state.Ctx.LogD("sp-startR-write", les, func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): sending first message",
+ node.Name, NicenessFmt(state.Nice),
+ )
+ })
buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
if err != nil {
state.dirUnlock()
}
conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
if err = state.WriteSP(conn, buf, false); err != nil {
- state.Ctx.LogE("sp-start", les, err, "")
+ state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): writing",
+ node.Name, NicenessFmt(state.Nice),
+ )
+ })
state.dirUnlock()
return err
}
- state.Ctx.LogD("sp-start", les, "starting workers")
+ state.Ctx.LogD("sp-startR-workers", les, func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): starting workers",
+ node.Name, NicenessFmt(state.Nice),
+ )
+ })
err = state.StartWorkers(conn, infosPayloads, payload)
if err != nil {
state.dirUnlock()
go func() {
for _, payload := range infosPayloads[1:] {
state.Ctx.LogD(
- "sp-work",
+ "sp-queue-remaining",
append(les, LE{"Size", len(payload)}),
- "queuing remaining payload",
+ func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): queuing remaining payload (%s)",
+ state.Node.Name, NicenessFmt(state.Nice),
+ humanize.IBytes(uint64(len(payload))),
+ )
+ },
)
state.payloads <- payload
}
}
// Processing of first payload and queueing its responses
- state.Ctx.LogD(
- "sp-work",
- append(les, LE{"Size", len(payload)}),
- "processing first payload",
- )
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): processing first payload (%s)",
+ state.Node.Name, NicenessFmt(state.Nice),
+ humanize.IBytes(uint64(len(payload))),
+ )
+ }
+ state.Ctx.LogD("sp-process", append(les, LE{"Size", len(payload)}), logMsg)
replies, err := state.ProcessSP(payload)
if err != nil {
- state.Ctx.LogE("sp-work", les, err, "")
+ state.Ctx.LogE("sp-process", les, err, logMsg)
return err
}
state.wg.Add(1)
go func() {
for _, reply := range replies {
state.Ctx.LogD(
- "sp-work",
+ "sp-queue-reply",
append(les, LE{"Size", len(reply)}),
- "queuing reply",
+ func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): queuing reply (%s)",
+ state.Node.Name, NicenessFmt(state.Nice),
+ humanize.IBytes(uint64(len(payload))),
+ )
+ },
)
state.payloads <- reply
}
&state.infosOurSeen,
) {
state.Ctx.LogD(
- "sp-work",
+ "sp-queue-info",
append(les, LE{"Size", len(payload)}),
- "queuing new info",
+ func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): queuing new info (%s)",
+ state.Node.Name, NicenessFmt(state.Nice),
+ humanize.IBytes(uint64(len(payload))),
+ )
+ },
)
state.payloads <- payload
}
var ping bool
select {
case <-state.pings:
- state.Ctx.LogD("sp-xmit", les, "got ping")
+ state.Ctx.LogD("sp-got-ping", les, func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): got ping",
+ state.Node.Name, NicenessFmt(state.Nice),
+ )
+ })
payload = SPPingMarshalized
ping = true
case payload = <-state.payloads:
state.Ctx.LogD(
- "sp-xmit",
+ "sp-got-payload",
append(les, LE{"Size", len(payload)}),
- "got payload",
+ func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): got payload (%s)",
+ state.Node.Name, NicenessFmt(state.Nice),
+ humanize.IBytes(uint64(len(payload))),
+ )
+ },
)
default:
state.RLock()
if state.txRate > 0 {
time.Sleep(time.Second / time.Duration(state.txRate))
}
- lesp := append(les, LEs{
- {"XX", string(TTx)},
- {"Pkt", Base32Codec.EncodeToString(freq.Hash[:])},
- {"Size", int64(freq.Offset)},
- }...)
- state.Ctx.LogD("sp-file", lesp, "queueing")
+ pktName := Base32Codec.EncodeToString(freq.Hash[:])
+ lesp := append(
+ les,
+ LE{"XX", string(TTx)},
+ LE{"Pkt", pktName},
+ LE{"Size", int64(freq.Offset)},
+ )
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): tx/%s (%s)",
+ state.Node.Name, NicenessFmt(state.Nice),
+ pktName,
+ humanize.IBytes(freq.Offset),
+ )
+ }
+ state.Ctx.LogD("sp-queue", lesp, func(les LEs) string {
+ return logMsg(les) + ": queueing"
+ })
pth := filepath.Join(
state.Ctx.Spool,
state.Node.Id.String(),
if !exists {
fd, err := os.Open(pth)
if err != nil {
- state.Ctx.LogE("sp-file", lesp, err, "")
+ state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string {
+ return logMsg(les) + ": opening"
+ })
return
}
fi, err := fd.Stat()
if err != nil {
- state.Ctx.LogE("sp-file", lesp, err, "")
+ state.Ctx.LogE("sp-queue-stat", lesp, err, func(les LEs) string {
+ return logMsg(les) + ": stating"
+ })
return
}
fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
fullSize := fdAndFullSize.fullSize
var buf []byte
if freq.Offset < uint64(fullSize) {
- state.Ctx.LogD("sp-file", lesp, "seeking")
+ state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string {
+ return logMsg(les) + ": seeking"
+ })
if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
- state.Ctx.LogE("sp-file", lesp, err, "")
+ state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
+ return logMsg(les) + ": seeking"
+ })
return
}
buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
n, err := fd.Read(buf)
if err != nil {
- state.Ctx.LogE("sp-file", lesp, err, "")
+ state.Ctx.LogE("sp-file-read", lesp, err, func(les LEs) string {
+ return logMsg(les) + ": reading"
+ })
return
}
buf = buf[:n]
- state.Ctx.LogD("sp-file", append(lesp, LE{"Size", n}), "read")
+ state.Ctx.LogD(
+ "sp-file-read",
+ append(lesp, LE{"Size", n}),
+ func(les LEs) string {
+ return fmt.Sprintf(
+ "%s: read %s",
+ logMsg(les), humanize.IBytes(uint64(n)),
+ )
+ },
+ )
}
state.closeFd(pth)
payload = MarshalSP(SPTypeFile, SPFile{
Payload: buf,
})
ourSize := freq.Offset + uint64(len(buf))
- lesp = append(lesp, LE{"Size", int64(ourSize)})
- lesp = append(lesp, LE{"FullSize", fullSize})
+ lesp = append(lesp, LE{"Size", int64(ourSize)}, LE{"FullSize", fullSize})
if state.Ctx.ShowPrgrs {
Progress("Tx", lesp)
}
state.Lock()
if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
if ourSize == uint64(fullSize) {
- state.Ctx.LogD("sp-file", lesp, "finished")
+ state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string {
+ return logMsg(les) + ": finished"
+ })
if len(state.queueTheir) > 1 {
state.queueTheir = state.queueTheir[1:]
} else {
state.queueTheir[0].freq.Offset += uint64(len(buf))
}
} else {
- state.Ctx.LogD("sp-file", lesp, "queue disappeared")
+ state.Ctx.LogD("sp-file-disappeared", lesp, func(les LEs) string {
+ return logMsg(les) + ": queue disappeared"
+ })
}
state.Unlock()
}
- state.Ctx.LogD("sp-xmit", append(les, LE{"Size", len(payload)}), "sending")
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): sending %s",
+ state.Node.Name, NicenessFmt(state.Nice),
+ humanize.IBytes(uint64(len(payload))),
+ )
+ }
+ state.Ctx.LogD("sp-sending", append(les, LE{"Size", len(payload)}), logMsg)
conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
- state.Ctx.LogE("sp-xmit", les, err, "")
+ state.Ctx.LogE("sp-sending", les, err, logMsg)
return
}
}
if state.NotAlive() {
break
}
- state.Ctx.LogD("sp-recv", les, "waiting for payload")
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): waiting for payload",
+ state.Node.Name, NicenessFmt(state.Nice),
+ )
+ }
+ state.Ctx.LogD("sp-recv-wait", les, logMsg)
conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
payload, err := state.ReadSP(conn)
if err != nil {
if unmarshalErr.ErrorCode == xdr.ErrIO {
break
}
- state.Ctx.LogE("sp-recv", les, err, "")
+ state.Ctx.LogE("sp-recv-wait", les, err, logMsg)
break
}
+ logMsg = func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): payload (%s)",
+ state.Node.Name, NicenessFmt(state.Nice),
+ humanize.IBytes(uint64(len(payload))),
+ )
+ }
state.Ctx.LogD(
- "sp-recv",
+ "sp-recv-got",
append(les, LE{"Size", len(payload)}),
- "got payload",
+ func(les LEs) string { return logMsg(les) + ": got" },
)
payload, err = state.csTheir.Decrypt(nil, nil, payload)
if err != nil {
- state.Ctx.LogE("sp-recv", les, err, "")
+ state.Ctx.LogE("sp-recv-got", les, err, func(les LEs) string {
+ return logMsg(les) + ": got"
+ })
break
}
state.Ctx.LogD(
- "sp-recv",
+ "sp-recv-process",
append(les, LE{"Size", len(payload)}),
- "processing",
+ func(les LEs) string {
+ return logMsg(les) + ": processing"
+ },
)
replies, err := state.ProcessSP(payload)
if err != nil {
- state.Ctx.LogE("sp-recv", les, err, "")
+ state.Ctx.LogE("sp-recv-process", les, err, func(les LEs) string {
+ return logMsg(les) + ": processing"
+ })
break
}
state.wg.Add(1)
go func() {
for _, reply := range replies {
state.Ctx.LogD(
- "sp-recv",
- append(les, LE{"Size", len(reply)}),
- "queuing reply",
+ "sp-recv-reply",
+ append(les[:len(les)-1], LE{"Size", len(reply)}),
+ func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): queuing reply (%s)",
+ state.Node.Name, NicenessFmt(state.Nice),
+ humanize.IBytes(uint64(len(reply))),
+ )
+ },
)
state.payloads <- reply
}
var replies [][]byte
var infosGot bool
for r.Len() > 0 {
- state.Ctx.LogD("sp-process", les, "unmarshaling header")
+ state.Ctx.LogD("sp-process-unmarshal", les, func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): unmarshaling header",
+ state.Node.Name, NicenessFmt(state.Nice),
+ )
+ })
var head SPHead
if _, err = xdr.Unmarshal(r, &head); err != nil {
- state.Ctx.LogE("sp-process", les, err, "")
+ state.Ctx.LogE("sp-process-unmarshal", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): unmarshaling header",
+ state.Node.Name, NicenessFmt(state.Nice),
+ )
+ })
return nil, err
}
if head.Type != SPTypePing {
}
switch head.Type {
case SPTypeHalt:
- state.Ctx.LogD("sp-process", append(les, LE{"Type", "halt"}), "")
+ state.Ctx.LogD(
+ "sp-process-halt",
+ append(les, LE{"Type", "halt"}), func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): got HALT",
+ state.Node.Name, NicenessFmt(state.Nice),
+ )
+ },
+ )
state.Lock()
state.queueTheir = nil
state.Unlock()
case SPTypePing:
- state.Ctx.LogD("sp-process", append(les, LE{"Type", "ping"}), "")
+ state.Ctx.LogD(
+ "sp-process-ping",
+ append(les, LE{"Type", "ping"}),
+ func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): got PING",
+ state.Node.Name, NicenessFmt(state.Nice),
+ )
+ },
+ )
case SPTypeInfo:
infosGot = true
lesp := append(les, LE{"Type", "info"})
- state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
+ state.Ctx.LogD(
+ "sp-process-info-unmarshal", lesp,
+ func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): unmarshaling INFO",
+ state.Node.Name, NicenessFmt(state.Nice),
+ )
+ },
+ )
var info SPInfo
if _, err = xdr.Unmarshal(r, &info); err != nil {
- state.Ctx.LogE("sp-process", lesp, err, "")
+ state.Ctx.LogE(
+ "sp-process-info-unmarshal", lesp, err,
+ func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): unmarshaling INFO",
+ state.Node.Name, NicenessFmt(state.Nice),
+ )
+ },
+ )
return nil, err
}
- lesp = append(lesp, LEs{
- {"Pkt", Base32Codec.EncodeToString(info.Hash[:])},
- {"Size", int64(info.Size)},
- {"Nice", int(info.Nice)},
- }...)
+ pktName := Base32Codec.EncodeToString(info.Hash[:])
+ lesp = append(
+ lesp,
+ LE{"Pkt", pktName},
+ LE{"Size", int64(info.Size)},
+ LE{"PktNice", int(info.Nice)},
+ )
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): INFO %s (%s) nice %s",
+ state.Node.Name, NicenessFmt(state.Nice),
+ pktName,
+ humanize.IBytes(info.Size),
+ NicenessFmt(info.Nice),
+ )
+ }
if !state.listOnly && info.Nice > state.Nice {
- state.Ctx.LogD("sp-process", lesp, "too nice")
+ state.Ctx.LogD("sp-process-info-too-nice", lesp, func(les LEs) string {
+ return logMsg(les) + ": too nice"
+ })
continue
}
- state.Ctx.LogD("sp-process", lesp, "received")
+ state.Ctx.LogD("sp-process-info-got", lesp, func(les LEs) string {
+ return logMsg(les) + ": received"
+ })
if !state.listOnly && state.xxOnly == TTx {
continue
}
state.Lock()
state.infosTheir[*info.Hash] = &info
state.Unlock()
- state.Ctx.LogD("sp-process", lesp, "stating part")
+ state.Ctx.LogD("sp-process-info-stat", lesp, func(les LEs) string {
+ return logMsg(les) + ": stating part"
+ })
pktPath := filepath.Join(
state.Ctx.Spool,
state.Node.Id.String(),
string(TRx),
Base32Codec.EncodeToString(info.Hash[:]),
)
+ logMsg = func(les LEs) string {
+ return fmt.Sprintf(
+ "Packet %s (%s) (nice %s)",
+ pktName,
+ humanize.IBytes(info.Size),
+ NicenessFmt(info.Nice),
+ )
+ }
if _, err = os.Stat(pktPath); err == nil {
- state.Ctx.LogI("sp-info", lesp, "already done")
+ state.Ctx.LogI("sp-info-done", lesp, func(les LEs) string {
+ return logMsg(les) + ": already done"
+ })
if !state.listOnly {
replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
}
continue
}
if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
- state.Ctx.LogI("sp-info", lesp, "already seen")
+ state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string {
+ return logMsg(les) + ": already seen"
+ })
if !state.listOnly {
replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
}
continue
}
if _, err = os.Stat(pktPath + NoCKSuffix); err == nil {
- state.Ctx.LogI("sp-info", lesp, "still non checksummed")
+ state.Ctx.LogI("sp-info-nock", lesp, func(les LEs) string {
+ return logMsg(les) + ": still not checksummed"
+ })
continue
}
fi, err := os.Stat(pktPath + PartSuffix)
offset = fi.Size()
}
if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
- state.Ctx.LogI("sp-info", lesp, "not enough space")
+ state.Ctx.LogI("sp-info-no-space", lesp, func(les LEs) string {
+ return logMsg(les) + ": not enough space"
+ })
continue
}
- state.Ctx.LogI("sp-info", append(lesp, LE{"Offset", offset}), "")
+ state.Ctx.LogI(
+ "sp-info",
+ append(lesp, LE{"Offset", offset}),
+ func(les LEs) string {
+ return fmt.Sprintf(
+ "%s: %d%%", logMsg(les), 100*uint64(offset)/info.Size,
+ )
+ },
+ )
if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
replies = append(replies, MarshalSP(
SPTypeFreq,
case SPTypeFile:
lesp := append(les, LE{"Type", "file"})
- state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
+ state.Ctx.LogD("sp-process-file", lesp, func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): unmarshaling FILE",
+ state.Node.Name, NicenessFmt(state.Nice),
+ )
+ })
var file SPFile
if _, err = xdr.Unmarshal(r, &file); err != nil {
- state.Ctx.LogE("sp-process", lesp, err, "")
+ state.Ctx.LogE("sp-process-file", lesp, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): unmarshaling FILE",
+ state.Node.Name, NicenessFmt(state.Nice),
+ )
+ })
return nil, err
}
- lesp = append(lesp, LEs{
- {"XX", string(TRx)},
- {"Pkt", Base32Codec.EncodeToString(file.Hash[:])},
- {"Size", len(file.Payload)},
- }...)
+ pktName := Base32Codec.EncodeToString(file.Hash[:])
+ lesp = append(
+ lesp,
+ LE{"XX", string(TRx)},
+ LE{"Pkt", pktName},
+ LE{"Size", len(file.Payload)},
+ )
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "Got packet %s (%s)",
+ pktName, humanize.IBytes(uint64(len(file.Payload))),
+ )
+ }
dirToSync := filepath.Join(
state.Ctx.Spool,
state.Node.Id.String(),
string(TRx),
)
- filePath := filepath.Join(dirToSync, Base32Codec.EncodeToString(file.Hash[:]))
+ filePath := filepath.Join(dirToSync, pktName)
filePathPart := filePath + PartSuffix
- state.Ctx.LogD("sp-file", lesp, "opening part")
+ state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string {
+ return logMsg(les) + ": opening part"
+ })
fdAndFullSize, exists := state.fds[filePathPart]
var fd *os.File
if exists {
os.FileMode(0666),
)
if err != nil {
- state.Ctx.LogE("sp-file", lesp, err, "")
+ state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
+ return logMsg(les) + ": opening part"
+ })
return nil, err
}
state.fds[filePathPart] = FdAndFullSize{fd: fd}
state.fileHashers[filePath] = &HasherAndOffset{h: h}
}
}
- state.Ctx.LogD("sp-file", append(lesp, LE{"Offset", file.Offset}), "seeking")
+ state.Ctx.LogD(
+ "sp-file-seek",
+ append(lesp, LE{"Offset", file.Offset}),
+ func(les LEs) string {
+ return fmt.Sprintf("%s: seeking %d", logMsg(les), file.Offset)
+ })
if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
- state.Ctx.LogE("sp-file", lesp, err, "")
+ state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
+ return logMsg(les) + ": seeking"
+ })
state.closeFd(filePathPart)
return nil, err
}
- state.Ctx.LogD("sp-file", lesp, "writing")
+ state.Ctx.LogD("sp-file-write", lesp, func(les LEs) string {
+ return logMsg(les) + ": writing"
+ })
if _, err = fd.Write(file.Payload); err != nil {
- state.Ctx.LogE("sp-file", lesp, err, "")
+ state.Ctx.LogE("sp-file-write", lesp, err, func(les LEs) string {
+ return logMsg(les) + ": writing"
+ })
state.closeFd(filePathPart)
return nil, err
}
}
hasherAndOffset.offset += uint64(len(file.Payload))
} else {
- state.Ctx.LogE(
- "sp-file", lesp,
- errors.New("offset differs"),
- "deleting hasher",
+ state.Ctx.LogD(
+ "sp-file-offset-differs", lesp,
+ func(les LEs) string {
+ return logMsg(les) + ": offset differs, deleting hasher"
+ },
)
delete(state.fileHashers, filePath)
hasherExists = false
if fullsize != ourSize {
continue
}
+ logMsg = func(les LEs) string {
+ return fmt.Sprintf(
+ "Got packet %s %d%% (%s / %s)",
+ pktName, 100*ourSize/fullsize,
+ humanize.IBytes(uint64(ourSize)),
+ humanize.IBytes(uint64(fullsize)),
+ )
+ }
err = fd.Sync()
if err != nil {
- state.Ctx.LogE("sp-file", lesp, err, "sync")
+ state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string {
+ return logMsg(les) + ": syncing"
+ })
state.closeFd(filePathPart)
continue
}
if hasherExists {
if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 {
- state.Ctx.LogE("sp-file", lesp, errors.New("checksum mismatch"), "")
+ state.Ctx.LogE(
+ "sp-file-bad-checksum", lesp,
+ errors.New("checksum mismatch"),
+ logMsg,
+ )
continue
}
if err = os.Rename(filePathPart, filePath); err != nil {
- state.Ctx.LogE("sp-file", lesp, err, "rename")
+ state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
+ return logMsg(les) + ": renaming"
+ })
continue
}
if err = DirSync(dirToSync); err != nil {
- state.Ctx.LogE("sp-file", lesp, err, "sync")
+ state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
+ return logMsg(les) + ": dirsyncing"
+ })
continue
}
- state.Ctx.LogI("sp-file", lesp, "done")
+ state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string {
+ return logMsg(les) + ": done"
+ })
state.wg.Add(1)
go func() {
state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
continue
}
if _, err = fd.Seek(0, io.SeekStart); err != nil {
- state.Ctx.LogE("sp-file", lesp, err, "seek")
+ state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
+ return logMsg(les) + ": seeking"
+ })
state.closeFd(filePathPart)
continue
}
_, pktEncRaw, err := state.Ctx.HdrRead(fd)
state.closeFd(filePathPart)
if err != nil {
- state.Ctx.LogE("sp-file", lesp, err, "HdrRead")
+ state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string {
+ return logMsg(les) + ": HdrReading"
+ })
continue
}
state.Ctx.HdrWrite(pktEncRaw, filePath)
}
state.closeFd(filePathPart)
if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
- state.Ctx.LogE("sp-file", lesp, err, "rename")
+ state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
+ return logMsg(les) + ": renaming"
+ })
continue
}
if err = DirSync(dirToSync); err != nil {
- state.Ctx.LogE("sp-file", lesp, err, "sync")
+ state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
+ return logMsg(les) + ": dirsyncing"
+ })
continue
}
- state.Ctx.LogI("sp-file", lesp, "downloaded")
+ state.Ctx.LogI("sp-file-downloaded", lesp, func(les LEs) string {
+ return logMsg(les) + ": downloaded"
+ })
state.Lock()
delete(state.infosTheir, *file.Hash)
state.Unlock()
case SPTypeDone:
lesp := append(les, LE{"Type", "done"})
- state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
+ state.Ctx.LogD("sp-process-done-unmarshal", lesp, func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): unmarshaling DONE",
+ state.Node.Name, NicenessFmt(state.Nice),
+ )
+ })
var done SPDone
if _, err = xdr.Unmarshal(r, &done); err != nil {
- state.Ctx.LogE("sp-process", lesp, err, "")
+ state.Ctx.LogE("sp-process-done-unmarshal", lesp, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): unmarshaling DONE",
+ state.Node.Name, NicenessFmt(state.Nice),
+ )
+ })
return nil, err
}
- lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(done.Hash[:])})
- lesp = append(lesp, LE{"XX", string(TTx)})
- state.Ctx.LogD("sp-done", lesp, "removing")
+ pktName := Base32Codec.EncodeToString(done.Hash[:])
+ lesp = append(lesp, LE{"Pkt", pktName}, LE{"XX", string(TTx)})
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): DONE: removing %s",
+ state.Node.Name, NicenessFmt(state.Nice), pktName,
+ )
+ }
+ state.Ctx.LogD("sp-done", lesp, logMsg)
pth := filepath.Join(
state.Ctx.Spool,
state.Node.Id.String(),
string(TTx),
- Base32Codec.EncodeToString(done.Hash[:]),
+ pktName,
)
if err = os.Remove(pth); err == nil {
- state.Ctx.LogI("sp-done", lesp, "")
+ state.Ctx.LogI("sp-done", lesp, func(les LEs) string {
+ return fmt.Sprintf("Packet %s is sent", pktName)
+ })
if state.Ctx.HdrUsage {
os.Remove(pth + HdrSuffix)
}
} else {
- state.Ctx.LogE("sp-done", lesp, err, "")
+ state.Ctx.LogE("sp-done", lesp, err, logMsg)
}
case SPTypeFreq:
lesp := append(les, LE{"Type", "freq"})
- state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
+ state.Ctx.LogD("sp-process-freq", lesp, func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): unmarshaling FREQ",
+ state.Node.Name, NicenessFmt(state.Nice),
+ )
+ })
var freq SPFreq
if _, err = xdr.Unmarshal(r, &freq); err != nil {
- state.Ctx.LogE("sp-process", lesp, err, "")
+ state.Ctx.LogE("sp-process-freq", lesp, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): unmarshaling FREQ",
+ state.Node.Name, NicenessFmt(state.Nice),
+ )
+ })
return nil, err
}
- lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(freq.Hash[:])})
- lesp = append(lesp, LE{"Offset", freq.Offset})
- state.Ctx.LogD("sp-process", lesp, "queueing")
+ pktName := Base32Codec.EncodeToString(freq.Hash[:])
+ lesp = append(lesp, LE{"Pkt", pktName}, LE{"Offset", freq.Offset})
+ state.Ctx.LogD("sp-process-freq-queueing", lesp, func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): FREQ %s: queuing",
+ state.Node.Name, NicenessFmt(state.Nice), pktName,
+ )
+ })
nice, exists := state.infosOurSeen[*freq.Hash]
if exists {
if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
state.Unlock()
} else {
- state.Ctx.LogD("sp-process", lesp, "skipping")
+ state.Ctx.LogD("sp-process-freq-skip", lesp, func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): FREQ %s: skipping",
+ state.Node.Name, NicenessFmt(state.Nice), pktName,
+ )
+ })
}
} else {
- state.Ctx.LogD("sp-process", lesp, "unknown")
+ state.Ctx.LogD("sp-process-freq-unknown", lesp, func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): FREQ %s: unknown",
+ state.Node.Name, NicenessFmt(state.Nice), pktName,
+ )
+ })
}
default:
state.Ctx.LogE(
- "sp-process",
+ "sp-process-type-unknown",
append(les, LE{"Type", head.Type}),
errors.New("unknown type"),
- "",
+ func(les LEs) string {
+ return fmt.Sprintf(
+ "SP with %s (nice %s): %d",
+ state.Node.Name, NicenessFmt(state.Nice), head.Type,
+ )
+ },
)
return nil, BadPktType
}
}
+
if infosGot {
var pkts int
var size uint64
size += info.Size
}
state.RUnlock()
- state.Ctx.LogI("sp-infos", LEs{
+ state.Ctx.LogI("sp-infos-rx", LEs{
{"XX", string(TRx)},
{"Node", state.Node.Id},
{"Pkts", pkts},
{"Size", int64(size)},
- }, "")
+ }, func(les LEs) string {
+ return fmt.Sprintf(
+ "%s has got for us: %d packets, %s",
+ state.Node.Name, pkts, humanize.IBytes(size),
+ )
+ })
}
return payloadsSplit(replies), nil
}
import (
"bufio"
+ "fmt"
"hash"
"io"
"os"
}
fd, err := TempFile(jobsPath, "")
if err == nil {
- ctx.LogD("tmp", LEs{{"Src", fd.Name()}}, "created")
+ ctx.LogD("tmp", LEs{{"Src", fd.Name()}}, func(les LEs) string {
+ return "Temporary file created: %s" + fd.Name()
+ })
}
return fd, err
}
return err
}
checksum := tmp.Checksum()
- tmp.ctx.LogD("tmp", LEs{{"Src", tmp.Fd.Name()}, {"Dst", checksum}}, "commit")
+ tmp.ctx.LogD(
+ "tmp-rename",
+ LEs{{"Src", tmp.Fd.Name()}, {"Dst", checksum}},
+ func(les LEs) string {
+ return fmt.Sprintf("Temporary file: %s -> %s", tmp.Fd.Name(), checksum)
+ },
+ )
if err = os.Rename(tmp.Fd.Name(), filepath.Join(dir, checksum)); err != nil {
return err
}
"Subject: " + mime.BEncoding.Encode("UTF-8", subject),
}
if len(body) > 0 {
- lines = append(lines, []string{
+ lines = append(
+ lines,
"MIME-Version: 1.0",
"Content-Type: text/plain; charset=utf-8",
"Content-Transfer-Encoding: base64",
"",
base64.StdEncoding.EncodeToString(body),
- }...)
+ )
}
return strings.NewReader(strings.Join(lines, "\n"))
}
) bool {
dirLock, err := ctx.LockDir(nodeId, "toss")
if err != nil {
- ctx.LogE("rx", LEs{}, err, "lock")
return false
}
defer ctx.UnlockDir(dirLock)
defer decompressor.Close()
for job := range ctx.Jobs(nodeId, TRx) {
pktName := filepath.Base(job.Path)
- les := LEs{{"Node", job.PktEnc.Sender}, {"Pkt", pktName}}
+ les := LEs{
+ {"Node", job.PktEnc.Sender},
+ {"Pkt", pktName},
+ {"Nice", int(job.PktEnc.Nice)},
+ }
if job.PktEnc.Nice > nice {
- ctx.LogD("rx", append(les, LE{"Nice", int(job.PktEnc.Nice)}), "too nice")
+ ctx.LogD("rx-too-nice", les, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing %s/%s: too nice: %s",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ NicenessFmt(job.PktEnc.Nice),
+ )
+ })
continue
}
fd, err := os.Open(job.Path)
if err != nil {
- ctx.LogE("rx", les, err, "open")
+ ctx.LogE("rx-open", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing %s/%s: opening %s",
+ ctx.NodeName(job.PktEnc.Sender), pktName, job.Path,
+ )
+ })
isBad = true
continue
}
var pktSize int64
var pktSizeBlocks int64
if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
- ctx.LogE("rx", les, err, "unmarshal")
+ ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing %s/%s: unmarshal",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ )
+ })
isBad = true
goto Closing
}
}
pktSize -= pktSizeBlocks * poly1305.TagSize
les = append(les, LE{"Size", pktSize})
- ctx.LogD("rx", les, "taken")
+ ctx.LogD("rx", les, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing %s/%s (%s)",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ humanize.IBytes(uint64(pktSize)),
+ )
+ })
+
switch pkt.Type {
case PktTypeExec, PktTypeExecFat:
if noExec {
args = append(args, string(p))
}
argsStr := strings.Join(append([]string{handle}, args...), " ")
- les = append(les, LEs{
- {"Type", "exec"},
- {"Dst", argsStr},
- }...)
+ les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr})
sender := ctx.Neigh[*job.PktEnc.Sender]
cmdline, exists := sender.Exec[handle]
if !exists || len(cmdline) == 0 {
- ctx.LogE("rx", les, errors.New("No handle found"), "")
+ ctx.LogE(
+ "rx-no-handle", les, errors.New("No handle found"),
+ func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing exec %s/%s (%s): %s",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ humanize.IBytes(uint64(pktSize)), argsStr,
+ )
+ },
+ )
isBad = true
goto Closing
}
}
}
if !dryRun {
- cmd := exec.Command(
- cmdline[0],
- append(cmdline[1:], args...)...,
- )
+ cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...)
cmd.Env = append(
cmd.Env,
"NNCP_SELF="+ctx.Self.Id.String(),
}
output, err := cmd.Output()
if err != nil {
- ctx.LogE("rx", les, err, "handle")
+ ctx.LogE("rx-hande", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing exec %s/%s (%s): %s: handling",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ humanize.IBytes(uint64(pktSize)), argsStr,
+ )
+ })
isBad = true
goto Closing
}
"Exec from %s: %s", sender.Name, argsStr,
), output)
if err = cmd.Run(); err != nil {
- ctx.LogE("rx", les, err, "notify")
+ ctx.LogE("rx-notify", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing exec %s/%s (%s): %s: notifying",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ humanize.IBytes(uint64(pktSize)), argsStr,
+ )
+ })
}
}
}
}
- ctx.LogI("rx", les, "")
+ ctx.LogI("rx", les, func(les LEs) string {
+ return fmt.Sprintf(
+ "Got exec from %s to %s (%s)",
+ ctx.NodeName(job.PktEnc.Sender), argsStr,
+ humanize.IBytes(uint64(pktSize)),
+ )
+ })
if !dryRun {
if doSeen {
if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
}
}
if err = os.Remove(job.Path); err != nil {
- ctx.LogE("rx", les, err, "remove")
+ ctx.LogE("rx-notify", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing exec %s/%s (%s): %s: notifying",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ humanize.IBytes(uint64(pktSize)), argsStr,
+ )
+ })
isBad = true
} else if ctx.HdrUsage {
os.Remove(job.Path + HdrSuffix)
}
}
+
case PktTypeFile:
if noFile {
goto Closing
}
dst := string(pkt.Path[:int(pkt.PathLen)])
- les = append(les, LEs{{"Type", "file"}, {"Dst", dst}}...)
+ les = append(les, LE{"Type", "file"}, LE{"Dst", dst})
if filepath.IsAbs(dst) {
- ctx.LogE("rx", les, errors.New("non-relative destination path"), "")
+ ctx.LogE(
+ "rx-non-rel", les, errors.New("non-relative destination path"),
+ func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ humanize.IBytes(uint64(pktSize)), dst,
+ )
+ },
+ )
isBad = true
goto Closing
}
incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
if incoming == nil {
- ctx.LogE("rx", les, errors.New("incoming is not allowed"), "")
+ ctx.LogE(
+ "rx-no-incoming", les, errors.New("incoming is not allowed"),
+ func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ humanize.IBytes(uint64(pktSize)), dst,
+ )
+ },
+ )
isBad = true
goto Closing
}
dir := filepath.Join(*incoming, path.Dir(dst))
if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
- ctx.LogE("rx", les, err, "mkdir")
+ ctx.LogE("rx-mkdir", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: mkdir",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ humanize.IBytes(uint64(pktSize)), dst,
+ )
+ })
isBad = true
goto Closing
}
if !dryRun {
tmp, err := TempFile(dir, "file")
if err != nil {
- ctx.LogE("rx", les, err, "mktemp")
+ ctx.LogE("rx-mktemp", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: mktemp",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ humanize.IBytes(uint64(pktSize)), dst,
+ )
+ })
isBad = true
goto Closing
}
les = append(les, LE{"Tmp", tmp.Name()})
- ctx.LogD("rx", les, "created")
+ ctx.LogD("rx-tmp-created", les, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: created: %s",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ humanize.IBytes(uint64(pktSize)), dst, tmp.Name(),
+ )
+ })
bufW := bufio.NewWriter(tmp)
if _, err = CopyProgressed(
bufW, pipeR, "Rx file",
append(les, LE{"FullSize", pktSize}),
ctx.ShowPrgrs,
); err != nil {
- ctx.LogE("rx", les, err, "copy")
+ ctx.LogE("rx-copy", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: copying",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ humanize.IBytes(uint64(pktSize)), dst,
+ )
+ })
isBad = true
goto Closing
}
if err = bufW.Flush(); err != nil {
tmp.Close() // #nosec G104
- ctx.LogE("rx", les, err, "copy")
+ ctx.LogE("rx-flush", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: flushing",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ humanize.IBytes(uint64(pktSize)), dst,
+ )
+ })
isBad = true
goto Closing
}
if err = tmp.Sync(); err != nil {
tmp.Close() // #nosec G104
- ctx.LogE("rx", les, err, "copy")
+ ctx.LogE("rx-sync", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: syncing",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ humanize.IBytes(uint64(pktSize)), dst,
+ )
+ })
isBad = true
goto Closing
}
if err = tmp.Close(); err != nil {
- ctx.LogE("rx", les, err, "copy")
+ ctx.LogE("rx-close", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: closing",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ humanize.IBytes(uint64(pktSize)), dst,
+ )
+ })
isBad = true
goto Closing
}
if os.IsNotExist(err) {
break
}
- ctx.LogE("rx", les, err, "stat")
+ ctx.LogE("rx-stat", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: stating: %s",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ humanize.IBytes(uint64(pktSize)), dst, dstPath,
+ )
+ })
isBad = true
goto Closing
}
dstPathCtr++
}
if err = os.Rename(tmp.Name(), dstPath); err != nil {
- ctx.LogE("rx", les, err, "rename")
+ ctx.LogE("rx-rename", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: renaming",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ humanize.IBytes(uint64(pktSize)), dst,
+ )
+ })
isBad = true
}
if err = DirSync(*incoming); err != nil {
- ctx.LogE("rx", les, err, "sync")
+ ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: dirsyncing",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ humanize.IBytes(uint64(pktSize)), dst,
+ )
+ })
isBad = true
}
les = les[:len(les)-1] // delete Tmp
}
- ctx.LogI("rx", les, "")
+ ctx.LogI("rx", les, func(les LEs) string {
+ return fmt.Sprintf(
+ "Got file %s (%s) from %s",
+ dst, humanize.IBytes(uint64(pktSize)),
+ ctx.NodeName(job.PktEnc.Sender),
+ )
+ })
if !dryRun {
if doSeen {
if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
}
}
if err = os.Remove(job.Path); err != nil {
- ctx.LogE("rx", les, err, "remove")
+ ctx.LogE("rx-remove", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: removing",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ humanize.IBytes(uint64(pktSize)), dst,
+ )
+ })
isBad = true
} else if ctx.HdrUsage {
os.Remove(job.Path + HdrSuffix)
humanize.IBytes(uint64(pktSize)),
), nil)
if err = cmd.Run(); err != nil {
- ctx.LogE("rx", les, err, "notify")
+ ctx.LogE("rx-notify", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing file %s/%s (%s): %s: notifying",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ humanize.IBytes(uint64(pktSize)), dst,
+ )
+ })
}
}
}
+
case PktTypeFreq:
if noFreq {
goto Closing
}
src := string(pkt.Path[:int(pkt.PathLen)])
+ les := append(les, LE{"Type", "freq"}, LE{"Src", src})
if filepath.IsAbs(src) {
- ctx.LogE("rx", les, errors.New("non-relative source path"), "")
+ ctx.LogE(
+ "rx-non-rel", les, errors.New("non-relative source path"),
+ func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing freq %s/%s (%s): %s: notifying",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ humanize.IBytes(uint64(pktSize)), src,
+ )
+ },
+ )
isBad = true
goto Closing
}
- les := append(les, LEs{{"Type", "freq"}, {"Src", src}}...)
dstRaw, err := ioutil.ReadAll(pipeR)
if err != nil {
- ctx.LogE("rx", les, err, "read")
+ ctx.LogE("rx-read", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing freq %s/%s (%s): %s: reading",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ humanize.IBytes(uint64(pktSize)), src,
+ )
+ })
isBad = true
goto Closing
}
sender := ctx.Neigh[*job.PktEnc.Sender]
freqPath := sender.FreqPath
if freqPath == nil {
- ctx.LogE("rx", les, errors.New("freqing is not allowed"), "")
+ ctx.LogE(
+ "rx-no-freq", les, errors.New("freqing is not allowed"),
+ func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing freq %s/%s (%s): %s -> %s",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ humanize.IBytes(uint64(pktSize)), src, dst,
+ )
+ },
+ )
isBad = true
goto Closing
}
sender.FreqMaxSize,
)
if err != nil {
- ctx.LogE("rx", les, err, "tx file")
+ ctx.LogE("rx-tx", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing freq %s/%s (%s): %s -> %s: txing",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ humanize.IBytes(uint64(pktSize)), src, dst,
+ )
+ })
isBad = true
goto Closing
}
}
- ctx.LogI("rx", les, "")
+ ctx.LogI("rx", les, func(les LEs) string {
+ return fmt.Sprintf(
+ "Got file request %s to %s",
+ src, ctx.NodeName(job.PktEnc.Sender),
+ )
+ })
if !dryRun {
if doSeen {
if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
}
}
if err = os.Remove(job.Path); err != nil {
- ctx.LogE("rx", les, err, "remove")
+ ctx.LogE("rx-remove", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing freq %s/%s (%s): %s -> %s: removing",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ humanize.IBytes(uint64(pktSize)), src, dst,
+ )
+ })
isBad = true
} else if ctx.HdrUsage {
os.Remove(job.Path + HdrSuffix)
"Freq from %s: %s", sender.Name, src,
), nil)
if err = cmd.Run(); err != nil {
- ctx.LogE("rx", les, err, "notify")
+ ctx.LogE("rx-notify", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing freq %s/%s (%s): %s -> %s: notifying",
+ ctx.NodeName(job.PktEnc.Sender), pktName,
+ humanize.IBytes(uint64(pktSize)), src, dst,
+ )
+ })
}
}
}
+
case PktTypeTrns:
if noTrns {
goto Closing
copy(dst[:], pkt.Path[:int(pkt.PathLen)])
nodeId := NodeId(*dst)
node, known := ctx.Neigh[nodeId]
- les := append(les, LEs{{"Type", "trns"}, {"Dst", nodeId}}...)
+ les := append(les, LE{"Type", "trns"}, LE{"Dst", nodeId})
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing trns %s/%s (%s): %s",
+ ctx.NodeName(job.PktEnc.Sender),
+ pktName,
+ humanize.IBytes(uint64(pktSize)),
+ nodeId.String(),
+ )
+ }
if !known {
- ctx.LogE("rx", les, errors.New("unknown node"), "")
+ ctx.LogE("rx-unknown", les, errors.New("unknown node"), logMsg)
isBad = true
goto Closing
}
- ctx.LogD("rx", les, "taken")
+ ctx.LogD("rx-tx", les, logMsg)
if !dryRun {
if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
- ctx.LogE("rx", les, err, "tx trns")
+ ctx.LogE("rx", les, err, func(les LEs) string {
+ return logMsg(les) + ": txing"
+ })
isBad = true
goto Closing
}
}
- ctx.LogI("rx", les, "")
+ ctx.LogI("rx", les, func(les LEs) string {
+ return fmt.Sprintf(
+ "Got transitional packet from %s to %s (%s)",
+ ctx.NodeName(job.PktEnc.Sender),
+ ctx.NodeName(&nodeId),
+ humanize.IBytes(uint64(pktSize)),
+ )
+ })
if !dryRun {
if doSeen {
if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
}
}
if err = os.Remove(job.Path); err != nil {
- ctx.LogE("rx", les, err, "remove")
+ ctx.LogE("rx", les, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing trns %s/%s (%s): %s: removing",
+ ctx.NodeName(job.PktEnc.Sender),
+ pktName,
+ humanize.IBytes(uint64(pktSize)),
+ ctx.NodeName(&nodeId),
+ )
+ })
isBad = true
} else if ctx.HdrUsage {
os.Remove(job.Path + HdrSuffix)
}
}
+
default:
- ctx.LogE("rx", les, errors.New("unknown type"), "")
+ ctx.LogE(
+ "rx-type-unknown", les, errors.New("unknown type"),
+ func(les LEs) string {
+ return fmt.Sprintf(
+ "Tossing %s/%s (%s)",
+ ctx.NodeName(job.PktEnc.Sender),
+ pktName,
+ humanize.IBytes(uint64(pktSize)),
+ )
+ },
+ )
isBad = true
}
Closing:
"bytes"
"crypto/rand"
"errors"
+ "fmt"
"hash"
"io"
"io/ioutil"
"time"
xdr "github.com/davecgh/go-xdr/xdr2"
+ "github.com/dustin/go-humanize"
"github.com/klauspost/compress/zstd"
"golang.org/x/crypto/blake2b"
"golang.org/x/crypto/chacha20poly1305"
{"Node", hops[0].Id},
{"Nice", int(nice)},
{"Size", size},
- }, "wrote")
+ }, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tx packet to %s (%s) nice: %s",
+ ctx.NodeName(hops[0].Id),
+ humanize.IBytes(uint64(size)),
+ NicenessFmt(nice),
+ )
+ })
pktEncRaw, err = PktEncWrite(
ctx.Self, hops[0], pkt, nice, size, padSize, src, dst,
)
{"Node", node.Id},
{"Nice", int(nice)},
{"Size", size},
- }, "trns wrote")
+ }, func(les LEs) string {
+ return fmt.Sprintf(
+ "Tx trns packet to %s (%s) nice: %s",
+ ctx.NodeName(node.Id),
+ humanize.IBytes(uint64(size)),
+ NicenessFmt(nice),
+ )
+ })
_, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
errs <- err
dst.Close() // #nosec G104
{"Dst", dstPath},
{"Size", fileSize},
}
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "File %s (%s) sent to %s:%s",
+ srcPath,
+ humanize.IBytes(uint64(fileSize)),
+ ctx.NodeName(node.Id),
+ dstPath,
+ )
+ }
if err == nil {
- ctx.LogI("tx", les, "sent")
+ ctx.LogI("tx", les, logMsg)
} else {
- ctx.LogE("tx", les, err, "sent")
+ ctx.LogE("tx", les, err, logMsg)
}
return err
}
{"Dst", path},
{"Size", sizeToSend},
}
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "File %s (%s) sent to %s:%s",
+ srcPath,
+ humanize.IBytes(uint64(sizeToSend)),
+ ctx.NodeName(node.Id),
+ path,
+ )
+ }
if err == nil {
- ctx.LogI("tx", les, "sent")
+ ctx.LogI("tx", les, logMsg)
} else {
- ctx.LogE("tx", les, err, "sent")
+ ctx.LogE("tx", les, err, logMsg)
return err
}
hsh.Sum(metaPkt.Checksums[chunkNum][:0])
{"Dst", path},
{"Size", metaPktSize},
}
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "File %s (%s) sent to %s:%s",
+ srcPath,
+ humanize.IBytes(uint64(metaPktSize)),
+ ctx.NodeName(node.Id),
+ path,
+ )
+ }
if err == nil {
- ctx.LogI("tx", les, "sent")
+ ctx.LogI("tx", les, logMsg)
} else {
- ctx.LogE("tx", les, err, "sent")
+ ctx.LogE("tx", les, err, logMsg)
}
return err
}
{"Src", srcPath},
{"Dst", dstPath},
}
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "File request from %s:%s to %s sent",
+ ctx.NodeName(node.Id), srcPath,
+ dstPath,
+ )
+ }
if err == nil {
- ctx.LogI("tx", les, "sent")
+ ctx.LogI("tx", les, logMsg)
} else {
- ctx.LogE("tx", les, err, "sent")
+ ctx.LogE("tx", les, err, logMsg)
}
return err
}
_, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle)
}
+ dst := strings.Join(append([]string{handle}, args...), " ")
les := LEs{
{"Type", "exec"},
{"Node", node.Id},
{"Nice", int(nice)},
{"ReplyNice", int(replyNice)},
- {"Dst", strings.Join(append([]string{handle}, args...), " ")},
+ {"Dst", dst},
{"Size", size},
}
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "Exec sent to %s@%s (%s)",
+ ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)),
+ )
+ }
if err == nil {
- ctx.LogI("tx", les, "sent")
+ ctx.LogI("tx", les, logMsg)
} else {
- ctx.LogE("tx", les, err, "sent")
+ ctx.LogE("tx", les, err, logMsg)
}
return err
}
{"Nice", int(nice)},
{"Size", size},
}
- ctx.LogD("tx", les, "taken")
+ logMsg := func(les LEs) string {
+ return fmt.Sprintf(
+ "Transitional packet to %s (%s) (nice %s)",
+ ctx.NodeName(node.Id),
+ humanize.IBytes(uint64(size)),
+ NicenessFmt(nice),
+ )
+ }
+ ctx.LogD("tx", les, logMsg)
if !ctx.IsEnoughSpace(size) {
err := errors.New("is not enough space")
- ctx.LogE("tx", les, err, err.Error())
+ ctx.LogE("tx", les, err, logMsg)
return err
}
tmp, err := ctx.NewTmpFileWHash()
nodePath := filepath.Join(ctx.Spool, node.Id.String())
err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
if err == nil {
- ctx.LogI("tx", les, "sent")
+ ctx.LogI("tx", les, logMsg)
} else {
- ctx.LogI("tx", append(les, LE{"Err", err}), "sent")
+ ctx.LogI("tx", append(les, LE{"Err", err}), logMsg)
}
os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) // #nosec G104
return err