$ nncp-log [options]
@end example
-Parse @ref{Log, log} file and print out its records in human-readable form.
+Parse @ref{Log, log} file and print out its records in short
+human-readable form.
@node nncp-pkt
@section nncp-pkt
@node Log
@unnumbered Log format
-Log is a plaintext file with single log entry per line. Lines are "\n"
-separated. It is not intended to be read by human -- use @ref{nncp-log}
-utility.
-
-Each line has the following format:
-
-@verbatim
-LEVEL | DATETIME | SD | MSG
-@end verbatim
-
-Example log records:
-
-@verbatim
-I 2017-01-09T08:41:54.751732131Z [nncp-xfer node="VHMTRWDOXPLK7BR55ICZ5N32ZJUMRKZEMFNGGCEAXV66GG43PEBQ" pkt="KMG6FO5UNEK7HWVFJPWQYC7MOZ76KEZ4FWCGM62PWA2QE5755NPA" size="4162548" xx="tx"]
-I 2017-01-09T08:42:18.990005394Z [sp-infos node="BYRRQUULEHINPKEFN7CHMSHR5I5CK7PMX5HQNCYERTBAR4BOCG6Q" pkts="0" size="0" xx="tx"]
-I 2017-01-09T08:48:59.264847401Z [call-finish duration="10" node="BYRRQUULEHINPKEFN7CHMSHR5I5CK7PMX5HQNCYERTBAR4BOCG6Q" rxbytes="60" rxspeed="60" txbytes="108" txspeed="108"]
-@end verbatim
-
-@table @emph
-@item |
- Space character.
-@item LEVEL
- Is single character log level. As a rule is is either @verb{|I|}
- (informational message), or @verb{|E|} (error message).
-@item DATETIME
- UTC datetime in @url{https://tools.ietf.org/html/rfc339, RFC 3339}
- @verb{|2006-01-02T15:04:05.999999999Z|} format.
-@item SD
- Structured data as in @url{https://tools.ietf.org/html/rfc5424, RFC 5424}.
-@item MSG
- Arbitrary UTF-8 encoded text data.
-@end table
+Log is a plaintext file consisting of
+@url{https://www.gnu.org/software/recutils/, recfile} records. It can be
+read by human, but it is better to use either @ref{nncp-log}, or
+@command{recutils} utilities for selecting and formatting the required
+fields.
@node Новости
@section Новости
-@node Релиз 5.7.0
-@subsection Релиз 5.7.0
+@node Релиз 6.0.0
+@subsection Релиз 6.0.0
@itemize
+@item
+Журнал использует человеко-читаемый и легко обрабатываемый машиной
+@url{https://www.gnu.org/software/recutils/, recfile} формат для своих
+записей, вместо структурированных строчек RFC 3339. Старый формат
+журналов не поддерживается @command{nncp-log}.
+
@item
Работоспособность @option{-autotoss*} опции с @option{-inetd} режимом
@command{nncp-daemon}.
See also this page @ref{Новости, on russian}.
-@node Release 5.7.0
-@section Release 5.7.0
+@node Release 6.0.0
+@section Release 6.0.0
@itemize
+@item Log uses human readable and easy machine parseable
+@url{https://www.gnu.org/software/recutils/, recfile} format for the
+records, instead of structured RFC 3339 lines. Old logs are not readable
+by @command{nncp-log} anymore.
+
@item
@option{-autotoss*} option workability with @command{nncp-daemon}'s
@option{-inetd} mode.
onlyPkts map[[32]byte]bool,
) (isGood bool) {
for _, addr := range addrs {
- sds := SDS{"node": node.Id, "addr": addr}
- ctx.LogD("call", sds, "dialing")
+ les := LEs{{"Node", node.Id}, {"Addr", addr}}
+ ctx.LogD("call", les, "dialing")
var conn ConnDeadlined
var err error
if addr[0] == '|' {
conn, err = net.Dial("tcp", addr)
}
if err != nil {
- ctx.LogD("call", SdsAdd(sds, SDS{"err": err}), "dialing")
+ ctx.LogD("call", append(les, LE{"Err", err}), "dialing")
continue
}
- ctx.LogD("call", sds, "connected")
+ ctx.LogD("call", les, "connected")
state := SPState{
Ctx: ctx,
Node: node,
onlyPkts: onlyPkts,
}
if err = state.StartI(conn); err == nil {
- ctx.LogI("call-start", sds, "connected")
+ ctx.LogI("call-start", les, "connected")
state.Wait()
- ctx.LogI("call-finish", SDS{
- "node": state.Node.Id,
- "duration": int64(state.Duration.Seconds()),
- "rxbytes": state.RxBytes,
- "txbytes": state.TxBytes,
- "rxspeed": state.RxSpeed,
- "txspeed": state.TxSpeed,
+ ctx.LogI("call-finish", LEs{
+ {"Node", state.Node.Id},
+ {"Duration", int64(state.Duration.Seconds())},
+ {"RxBytes", state.RxBytes},
+ {"TxBytes", state.TxBytes},
+ {"RxSpeed", state.RxSpeed},
+ {"TxSpeed", state.TxSpeed},
}, "")
isGood = true
conn.Close() // #nosec G104
break
} else {
- ctx.LogE("call-start", sds, err, "")
+ ctx.LogE("call-start", les, err, "")
conn.Close() // #nosec G104
}
}
"golang.org/x/crypto/blake2b"
)
-func Check(src io.Reader, checksum []byte, sds SDS, showPrgrs bool) (bool, error) {
+func Check(src io.Reader, checksum []byte, les LEs, showPrgrs bool) (bool, error) {
hsh, err := blake2b.New256(nil)
if err != nil {
log.Fatalln(err)
}
- if _, err = CopyProgressed(hsh, bufio.NewReader(src), "check", sds, showPrgrs); err != nil {
+ if _, err = CopyProgressed(hsh, bufio.NewReader(src), "check", les, showPrgrs); err != nil {
return false, err
}
return bytes.Compare(hsh.Sum(nil), checksum) == 0, nil
func (ctx *Ctx) checkXxIsBad(nodeId *NodeId, xx TRxTx) bool {
isBad := false
for job := range ctx.Jobs(nodeId, xx) {
- sds := SDS{
- "xx": string(xx),
- "node": nodeId,
- "pkt": Base32Codec.EncodeToString(job.HshValue[:]),
- "fullsize": job.Size,
+ les := LEs{
+ {"XX", string(xx)},
+ {"Node", nodeId},
+ {"Pkt", Base32Codec.EncodeToString(job.HshValue[:])},
+ {"FullSize", job.Size},
}
- gut, err := Check(job.Fd, job.HshValue[:], sds, ctx.ShowPrgrs)
+ gut, err := Check(job.Fd, job.HshValue[:], les, ctx.ShowPrgrs)
job.Fd.Close() // #nosec G104
if err != nil {
- ctx.LogE("check", sds, err, "")
+ ctx.LogE("check", les, err, "")
return true
}
if !gut {
isBad = true
- ctx.LogE("check", sds, errors.New("bad"), "")
+ ctx.LogE("check", les, errors.New("bad"), "")
}
}
return isBad
ctx.Umask()
- sds := nncp.SDS{}
if *doTx {
- sds["xx"] = string(nncp.TTx)
var pktName string
bufStdout := bufio.NewWriter(os.Stdout)
tarWr := tar.NewWriter(bufStdout)
- for nodeId, _ := range nodeIds {
- sds["node"] = nodeId.String()
+ for nodeId := range nodeIds {
+ les := nncp.LEs{
+ {K: "XX", V: string(nncp.TTx)},
+ {K: "Node", V: nodeId.String()},
+ {K: "Pkt", V: "dummy"},
+ }
for job := range ctx.Jobs(&nodeId, nncp.TTx) {
pktName = filepath.Base(job.Fd.Name())
- sds["pkt"] = pktName
+ les[len(les)-1].V = pktName
if job.PktEnc.Nice > nice {
- ctx.LogD("nncp-bundle", sds, "too nice")
+ ctx.LogD("nncp-bundle", les, "too nice")
job.Fd.Close() // #nosec G104
continue
}
}
if _, err = nncp.CopyProgressed(
tarWr, job.Fd, "Tx",
- nncp.SdsAdd(sds, nncp.SDS{
- "pkt": nncp.Base32Codec.EncodeToString(job.HshValue[:]),
- "fullsize": job.Size,
- }),
+ append(les, nncp.LEs{
+ {K: "Pkt", V: nncp.Base32Codec.EncodeToString(job.HshValue[:])},
+ {K: "FullSize", V: job.Size},
+ }...),
ctx.ShowPrgrs,
); err != nil {
log.Fatalln("Error during copying to tar:", err)
log.Fatalln("Error during deletion:", err)
}
}
- ctx.LogI("nncp-bundle", nncp.SdsAdd(sds, nncp.SDS{"size": job.Size}), "")
+ ctx.LogI("nncp-bundle", append(les, nncp.LE{K: "Size", V: job.Size}), "")
}
}
if err = tarWr.Close(); err != nil {
panic(err)
}
tarR := tar.NewReader(bufStdin)
- sds["xx"] = string(nncp.TRx)
entry, err := tarR.Next()
if err != nil {
if err != io.EOF {
ctx.LogD(
"nncp-bundle",
- nncp.SdsAdd(sds, nncp.SDS{"err": err}),
+ nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Err", V: err}},
"error reading tar",
)
}
continue
}
if entry.Typeflag != tar.TypeDir {
- ctx.LogD("nncp-bundle", sds, "Expected NNCP/")
+ ctx.LogD(
+ "nncp-bundle",
+ nncp.LEs{{K: "XX", V: string(nncp.TRx)}},
+ "Expected NNCP/",
+ )
continue
}
entry, err = tarR.Next()
if err != io.EOF {
ctx.LogD(
"nncp-bundle",
- nncp.SdsAdd(sds, nncp.SDS{"err": err}),
+ nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Err", V: err}},
"error reading tar",
)
}
continue
}
- sds["pkt"] = entry.Name
+ les := nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Pkt", V: entry.Name}}
if entry.Size < nncp.PktEncOverhead {
- ctx.LogD("nncp-bundle", sds, "Too small packet")
+ ctx.LogD("nncp-bundle", les, "Too small packet")
continue
}
if !ctx.IsEnoughSpace(entry.Size) {
- ctx.LogE("nncp-bundle", sds, errors.New("not enough spool space"), "")
+ ctx.LogE("nncp-bundle", les, errors.New("not enough spool space"), "")
continue
}
pktName := filepath.Base(entry.Name)
if _, err = nncp.Base32Codec.DecodeString(pktName); err != nil {
- ctx.LogD("nncp-bundle", nncp.SdsAdd(sds, nncp.SDS{"err": "bad packet name"}), "")
+ ctx.LogD("nncp-bundle", append(les, nncp.LE{K: "Err", V: "bad packet name"}), "")
continue
}
if _, err = io.ReadFull(tarR, pktEncBuf); err != nil {
- ctx.LogD("nncp-bundle", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "read")
+ ctx.LogD("nncp-bundle", append(les, nncp.LE{K: "Err", V: err}), "read")
continue
}
if _, err = xdr.Unmarshal(bytes.NewReader(pktEncBuf), &pktEnc); err != nil {
- ctx.LogD("nncp-bundle", sds, "Bad packet structure")
+ ctx.LogD("nncp-bundle", les, "Bad packet structure")
continue
}
if pktEnc.Magic != nncp.MagicNNCPEv4 {
- ctx.LogD("nncp-bundle", sds, "Bad packet magic number")
+ ctx.LogD("nncp-bundle", les, "Bad packet magic number")
continue
}
if pktEnc.Nice > nice {
- ctx.LogD("nncp-bundle", sds, "too nice")
+ ctx.LogD("nncp-bundle", les, "too nice")
continue
}
if *pktEnc.Sender == *ctx.SelfId && *doDelete {
if len(nodeIds) > 0 {
if _, exists := nodeIds[*pktEnc.Recipient]; !exists {
- ctx.LogD("nncp-bundle", sds, "Recipient is not requested")
+ ctx.LogD("nncp-bundle", les, "Recipient is not requested")
continue
}
}
nodeId32 := nncp.Base32Codec.EncodeToString(pktEnc.Recipient[:])
- sds["xx"] = string(nncp.TTx)
- sds["node"] = nodeId32
- sds["pkt"] = pktName
- dstPath := filepath.Join(
- ctx.Spool,
- nodeId32,
- string(nncp.TTx),
- pktName,
- )
+ les := nncp.LEs{
+ {K: "XX", V: string(nncp.TTx)},
+ {K: "Node", V: nodeId32},
+ {K: "Pkt", V: pktName},
+ }
+ dstPath := filepath.Join(ctx.Spool, nodeId32, string(nncp.TTx), pktName)
if _, err = os.Stat(dstPath); err != nil {
- ctx.LogD("nncp-bundle", sds, "Packet is already missing")
+ ctx.LogD("nncp-bundle", les, "Packet is already missing")
continue
}
hsh, err := blake2b.New256(nil)
}
if _, err = nncp.CopyProgressed(
hsh, tarR, "Rx",
- nncp.SdsAdd(sds, nncp.SDS{"fullsize": entry.Size}),
+ append(les, nncp.LE{K: "FullSize", V: entry.Size}),
ctx.ShowPrgrs,
); err != nil {
log.Fatalln("Error during copying:", err)
}
if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) == pktName {
- ctx.LogI("nncp-bundle", sds, "removed")
+ ctx.LogI("nncp-bundle", les, "removed")
if !*dryRun {
os.Remove(dstPath) // #nosec G104
}
} else {
- ctx.LogE("nncp-bundle", sds, errors.New("bad checksum"), "")
+ ctx.LogE("nncp-bundle", les, errors.New("bad checksum"), "")
}
continue
}
if *pktEnc.Recipient != *ctx.SelfId {
- ctx.LogD("nncp-bundle", sds, "Unknown recipient")
+ ctx.LogD("nncp-bundle", les, "Unknown recipient")
continue
}
if len(nodeIds) > 0 {
if _, exists := nodeIds[*pktEnc.Sender]; !exists {
- ctx.LogD("nncp-bundle", sds, "Sender is not requested")
+ ctx.LogD("nncp-bundle", les, "Sender is not requested")
continue
}
}
sender := nncp.Base32Codec.EncodeToString(pktEnc.Sender[:])
- sds["node"] = sender
- sds["pkt"] = pktName
- sds["fullsize"] = entry.Size
+ les = nncp.LEs{
+ {K: "XX", V: string(nncp.TRx)},
+ {K: "Node", V: sender},
+ {K: "Pkt", V: pktName},
+ {K: "FullSize", V: entry.Size},
+ }
dstDirPath := filepath.Join(ctx.Spool, sender, string(nncp.TRx))
dstPath := filepath.Join(dstDirPath, pktName)
if _, err = os.Stat(dstPath); err == nil || !os.IsNotExist(err) {
- ctx.LogD("nncp-bundle", sds, "Packet already exists")
+ ctx.LogD("nncp-bundle", les, "Packet already exists")
continue
}
if _, err = os.Stat(dstPath + nncp.SeenSuffix); err == nil || !os.IsNotExist(err) {
- ctx.LogD("nncp-bundle", sds, "Packet already exists")
+ ctx.LogD("nncp-bundle", les, "Packet already exists")
continue
}
if *doCheck {
if _, err = hsh.Write(pktEncBuf); err != nil {
log.Fatalln("Error during writing:", err)
}
- if _, err = nncp.CopyProgressed(hsh, tarR, "check", sds, ctx.ShowPrgrs); err != nil {
+ if _, err = nncp.CopyProgressed(hsh, tarR, "check", les, ctx.ShowPrgrs); err != nil {
log.Fatalln("Error during copying:", err)
}
if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) != pktName {
- ctx.LogE("nncp-bundle", sds, errors.New("bad checksum"), "")
+ ctx.LogE("nncp-bundle", les, errors.New("bad checksum"), "")
continue
}
} else {
if _, err = tmp.W.Write(pktEncBuf); err != nil {
log.Fatalln("Error during writing:", err)
}
- if _, err = nncp.CopyProgressed(tmp.W, tarR, "check", sds, ctx.ShowPrgrs); err != nil {
+ if _, err = nncp.CopyProgressed(tmp.W, tarR, "check", les, ctx.ShowPrgrs); err != nil {
log.Fatalln("Error during copying:", err)
}
if err = tmp.W.Flush(); err != nil {
log.Fatalln("Error during commiting:", err)
}
} else {
- ctx.LogE("nncp-bundle", sds, errors.New("bad checksum"), "")
+ ctx.LogE("nncp-bundle", les, errors.New("bad checksum"), "")
tmp.Cancel()
continue
}
}
} else {
if *dryRun {
- if _, err = nncp.CopyProgressed(ioutil.Discard, tarR, "Rx", sds, ctx.ShowPrgrs); err != nil {
+ if _, err = nncp.CopyProgressed(ioutil.Discard, tarR, "Rx", les, ctx.ShowPrgrs); err != nil {
log.Fatalln("Error during copying:", err)
}
} else {
if _, err = bufTmp.Write(pktEncBuf); err != nil {
log.Fatalln("Error during writing:", err)
}
- if _, err = nncp.CopyProgressed(bufTmp, tarR, "Rx", sds, ctx.ShowPrgrs); err != nil {
+ if _, err = nncp.CopyProgressed(bufTmp, tarR, "Rx", les, ctx.ShowPrgrs); err != nil {
log.Fatalln("Error during copying:", err)
}
if err = bufTmp.Flush(); err != nil {
}
}
}
- ctx.LogI("nncp-bundle", nncp.SdsAdd(sds, nncp.SDS{
- "size": sds["fullsize"],
- }), "")
+ for _, le := range les {
+ if le.K == "FullSize" {
+ les = append(les, nncp.LE{K: "Size", V: le.V})
+ break
+ }
+ }
+ ctx.LogI("nncp-bundle", les, "")
}
}
}
log.Fatalln("Invalid NODE specified:", err)
}
if len(node.Calls) == 0 {
- ctx.LogD("caller", nncp.SDS{"node": node.Id}, "has no calls, skipping")
+ ctx.LogD("caller", nncp.LEs{{K: "Node", V: node.Id}}, "has no calls, skipping")
continue
}
nodes = append(nodes, node)
} else {
for _, node := range ctx.Neigh {
if len(node.Calls) == 0 {
- ctx.LogD("caller", nncp.SDS{"node": node.Id}, "has no calls, skipping")
+ ctx.LogD("caller", nncp.LEs{{K: "Node", V: node.Id}}, "has no calls, skipping")
continue
}
nodes = append(nodes, node)
} else {
addrs = append(addrs, *call.Addr)
}
- sds := nncp.SDS{"node": node.Id, "callindex": i}
+ les := nncp.LEs{{K: "Node", V: node.Id}, {K: "CallIndex", V: i}}
for {
n := time.Now()
t := call.Cron.Next(n)
- ctx.LogD("caller", sds, t.String())
+ ctx.LogD("caller", les, t.String())
if t.IsZero() {
- ctx.LogE("caller", sds, errors.New("got zero time"), "")
+ ctx.LogE("caller", les, errors.New("got zero time"), "")
return
}
time.Sleep(t.Sub(n))
node.Lock()
if node.Busy {
node.Unlock()
- ctx.LogD("caller", sds, "busy")
+ ctx.LogD("caller", les, "busy")
continue
} else {
node.Busy = true
node.Unlock()
if call.WhenTxExists && call.Xx != "TRx" {
- ctx.LogD("caller", sds, "checking tx existence")
+ ctx.LogD("caller", les, "checking tx existence")
txExists := false
for job := range ctx.Jobs(node.Id, nncp.TTx) {
job.Fd.Close()
txExists = true
}
if !txExists {
- ctx.LogD("caller", sds, "no tx")
+ ctx.LogD("caller", les, "no tx")
node.Lock()
node.Busy = false
node.Unlock()
Nice: nice,
}
if err := state.StartR(conn); err == nil {
- ctx.LogI("call-start", nncp.SDS{"node": state.Node.Id}, "connected")
+ ctx.LogI("call-start", nncp.LEs{{K: "Node", V: state.Node.Id}}, "connected")
nodeIdC <- state.Node.Id
state.Wait()
- ctx.LogI("call-finish", nncp.SDS{
- "node": state.Node.Id,
- "duration": int64(state.Duration.Seconds()),
- "rxbytes": state.RxBytes,
- "txbytes": state.TxBytes,
- "rxspeed": state.RxSpeed,
- "txspeed": state.TxSpeed,
+ ctx.LogI("call-finish", nncp.LEs{
+ {K: "Node", V: state.Node.Id},
+ {K: "Duration", V: int64(state.Duration.Seconds())},
+ {K: "RxBytes", V: state.RxBytes},
+ {K: "TxBytes", V: state.TxBytes},
+ {K: "RxSpeed", V: state.RxSpeed},
+ {K: "TxSpeed", V: state.TxSpeed},
}, "")
} else {
nodeId := "unknown"
nodeIdC <- state.Node.Id
nodeId = state.Node.Id.String()
}
- ctx.LogE("call-start", nncp.SDS{"node": nodeId}, err, "")
+ ctx.LogI("call-start", nncp.LEs{{K: "Node", V: nodeId}}, "connected")
}
close(nodeIdC)
}
if err != nil {
log.Fatalln("Can not accept connection:", err)
}
- ctx.LogD("daemon", nncp.SDS{"addr": conn.RemoteAddr()}, "accepted")
+ ctx.LogD("daemon", nncp.LEs{{K: "Addr", V: conn.RemoteAddr()}}, "accepted")
go func(conn net.Conn) {
nodeIdC := make(chan *nncp.NodeId)
go performSP(ctx, conn, nice, nodeIdC)
package main
import (
- "bufio"
"flag"
"fmt"
+ "io"
"log"
"os"
"go.cypherpunks.ru/nncp/v5"
+ "go.cypherpunks.ru/recfile"
)
func usage() {
if err != nil {
log.Fatalln("Can not open log:", err)
}
- scanner := bufio.NewScanner(fd)
- for scanner.Scan() {
- t := scanner.Text()
+ r := recfile.NewReader(fd)
+ for {
+ le, err := r.NextMap()
+ if err != nil {
+ if err == io.EOF {
+ break
+ }
+ log.Fatalln("Can not read log:", err)
+ }
if *debug {
- fmt.Println(t)
+ fmt.Println(le)
}
- fmt.Println(ctx.Humanize(t))
- }
- if err = scanner.Err(); err != nil {
- log.Fatalln("Can not read log:", err)
+ s, err := ctx.Humanize(le)
+ if err != nil {
+ s = fmt.Sprintf("Can not humanize: %s\n%s", err, le)
+ }
+ fmt.Println(s)
}
}
log.Fatalln("Can not open file:", err)
}
var metaPkt nncp.ChunkedMeta
+ les := nncp.LEs{{K: "Path", V: path}}
if _, err = xdr.Unmarshal(fd, &metaPkt); err != nil {
- ctx.LogE("nncp-reass", nncp.SDS{"path": path}, err, "bad meta file")
+ ctx.LogE("nncp-reass", les, err, "bad meta file")
return false
}
fd.Close() // #nosec G104
if metaPkt.Magic != nncp.MagicNNCPMv1 {
- ctx.LogE("nncp-reass", nncp.SDS{"path": path}, nncp.BadMagic, "")
+ ctx.LogE("nncp-reass", les, nncp.BadMagic, "")
return false
}
metaName := filepath.Base(path)
if !strings.HasSuffix(metaName, nncp.ChunkedSuffixMeta) {
- ctx.LogE("nncp-reass", nncp.SDS{"path": path}, errors.New("invalid filename suffix"), "")
+ ctx.LogE("nncp-reass", les, errors.New("invalid filename suffix"), "")
return false
}
mainName := strings.TrimSuffix(metaName, nncp.ChunkedSuffixMeta)
allChunksExist := true
for chunkNum, chunkPath := range chunksPaths {
fi, err := os.Stat(chunkPath)
+ lesChunk := append(les, nncp.LE{K: "Chunk", V: chunkNum})
if err != nil && os.IsNotExist(err) {
- ctx.LogI("nncp-reass", nncp.SDS{"path": path, "chunk": chunkNum}, "missing")
+ ctx.LogI("nncp-reass", lesChunk, "missing")
allChunksExist = false
continue
}
badSize = uint64(fi.Size()) != metaPkt.ChunkSize
}
if badSize {
- ctx.LogE(
- "nncp-reass",
- nncp.SDS{"path": path, "chunk": chunkNum},
- errors.New("invalid size"), "",
- )
+ ctx.LogE("nncp-reass", lesChunk, errors.New("invalid size"), "")
allChunksExist = false
}
}
}
if _, err = nncp.CopyProgressed(
hsh, bufio.NewReader(fd), "check",
- nncp.SDS{
- "pkt": chunkPath,
- "fullsize": fi.Size(),
+ nncp.LEs{
+ {K: "Pkt", V: chunkPath},
+ {K: "FullSize", V: fi.Size()},
},
ctx.ShowPrgrs,
); err != nil {
if bytes.Compare(hsh.Sum(nil), metaPkt.Checksums[chunkNum][:]) != 0 {
ctx.LogE(
"nncp-reass",
- nncp.SDS{"path": path, "chunk": chunkNum},
+ nncp.LEs{{K: "Path", V: path}, {K: "Chunk", V: chunkNum}},
errors.New("checksum is bad"), "",
)
allChecksumsGood = false
return false
}
if dryRun {
- ctx.LogI("nncp-reass", nncp.SDS{"path": path}, "ready")
+ ctx.LogI("nncp-reass", nncp.LEs{{K: "path", V: path}}, "ready")
return true
}
var dst io.Writer
var tmp *os.File
- var sds nncp.SDS
if stdout {
dst = os.Stdout
- sds = nncp.SDS{"path": path}
+ les = nncp.LEs{{K: "path", V: path}}
} else {
tmp, err = nncp.TempFile(mainDir, "reass")
if err != nil {
log.Fatalln(err)
}
- sds = nncp.SDS{"path": path, "tmp": tmp.Name()}
- ctx.LogD("nncp-reass", sds, "created")
+ les = nncp.LEs{{K: "path", V: path}, {K: "Tmp", V: tmp.Name()}}
+ ctx.LogD("nncp-reass", les, "created")
dst = tmp
}
dstW := bufio.NewWriter(dst)
}
if _, err = nncp.CopyProgressed(
dstW, bufio.NewReader(fd), "reass",
- nncp.SDS{
- "pkt": chunkPath,
- "fullsize": fi.Size(),
+ nncp.LEs{
+ {K: "Pkt", V: chunkPath},
+ {K: "FullSize", V: fi.Size()},
},
ctx.ShowPrgrs,
); err != nil {
fd.Close() // #nosec G104
if !keep {
if err = os.Remove(chunkPath); err != nil {
- ctx.LogE("nncp-reass", nncp.SdsAdd(sds, nncp.SDS{"chunk": chunkNum}), err, "")
+ ctx.LogE("nncp-reass", append(les, nncp.LE{K: "Chunk", V: chunkNum}), err, "")
hasErrors = true
}
}
log.Fatalln("Can not close:", err)
}
}
- ctx.LogD("nncp-reass", sds, "written")
+ ctx.LogD("nncp-reass", les, "written")
if !keep {
if err = os.Remove(path); err != nil {
- ctx.LogE("nncp-reass", sds, err, "")
+ ctx.LogE("nncp-reass", les, err, "")
hasErrors = true
}
}
if stdout {
- ctx.LogI("nncp-reass", nncp.SDS{"path": path}, "done")
+ ctx.LogI("nncp-reass", nncp.LEs{{K: "Path", V: path}}, "done")
return !hasErrors
}
if err = nncp.DirSync(mainDir); err != nil {
log.Fatalln(err)
}
- ctx.LogI("nncp-reass", nncp.SDS{"path": path}, "done")
+ ctx.LogI("nncp-reass", nncp.LEs{{K: "Path", V: path}}, "done")
return !hasErrors
}
dir, err := os.Open(dirPath)
defer dir.Close()
if err != nil {
- ctx.LogE("nncp-reass", nncp.SDS{"path": dirPath}, err, "")
+ ctx.LogE("nncp-reass", nncp.LEs{{K: "Path", V: dirPath}}, err, "")
return nil
}
fis, err := dir.Readdir(0)
dir.Close() // #nosec G104
if err != nil {
- ctx.LogE("nncp-reass", nncp.SDS{"path": dirPath}, err, "")
+ ctx.LogE("nncp-reass", nncp.LEs{{K: "Path", V: dirPath}}, err, "")
return nil
}
metaPaths := make([]string, 0)
return nil
}
if now.Sub(info.ModTime()) < oldBoundary {
- ctx.LogD("nncp-rm", nncp.SDS{"file": path}, "too fresh, skipping")
+ ctx.LogD("nncp-rm", nncp.LEs{{K: "File", V: path}}, "too fresh, skipping")
return nil
}
- ctx.LogI("nncp-rm", nncp.SDS{"file": path}, "")
+ ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "")
if *dryRun {
return nil
}
return nil
}
if strings.HasSuffix(info.Name(), ".lock") {
- ctx.LogI("nncp-rm", nncp.SDS{"file": path}, "")
+ ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "")
if *dryRun {
return nil
}
return nil
}
if now.Sub(info.ModTime()) < oldBoundary {
- ctx.LogD("nncp-rm", nncp.SDS{"file": path}, "too fresh, skipping")
+ ctx.LogD("nncp-rm", nncp.LEs{{K: "File", V: path}}, "too fresh, skipping")
return nil
}
if *doSeen && strings.HasSuffix(info.Name(), nncp.SeenSuffix) {
- ctx.LogI("nncp-rm", nncp.SDS{"file": path}, "")
+ ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "")
if *dryRun {
return nil
}
return os.Remove(path)
}
if *doPart && strings.HasSuffix(info.Name(), nncp.PartSuffix) {
- ctx.LogI("nncp-rm", nncp.SDS{"file": path}, "")
+ ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "")
if *dryRun {
return nil
}
return os.Remove(path)
}
if *pktRaw != "" && filepath.Base(info.Name()) == *pktRaw {
- ctx.LogI("nncp-rm", nncp.SDS{"file": path}, "")
+ ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "")
if *dryRun {
return nil
}
!*doPart &&
(*doRx || *doTx) &&
((*doRx && xx == nncp.TRx) || (*doTx && xx == nncp.TTx)) {
- ctx.LogI("nncp-rm", nncp.SDS{"file": path}, "")
+ ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "")
if *dryRun {
return nil
}
isBad := false
var dir *os.File
var fis []os.FileInfo
- sds := nncp.SDS{}
+ var les nncp.LEs
if *txOnly {
goto Tx
}
- sds["xx"] = string(nncp.TRx)
- sds["dir"] = selfPath
- ctx.LogD("nncp-xfer", sds, "self")
+ les = nncp.LEs{
+ {K: "XX", V: string(nncp.TRx)},
+ {K: "Dir", V: selfPath},
+ }
+ ctx.LogD("nncp-xfer", les, "self")
if _, err = os.Stat(selfPath); err != nil {
if os.IsNotExist(err) {
- ctx.LogD("nncp-xfer", sds, "no dir")
+ ctx.LogD("nncp-xfer", les, "no dir")
goto Tx
}
- ctx.LogE("nncp-xfer", sds, err, "stat")
+ ctx.LogE("nncp-xfer", les, err, "stat")
isBad = true
goto Tx
}
dir, err = os.Open(selfPath)
if err != nil {
- ctx.LogE("nncp-xfer", sds, err, "open")
+ ctx.LogE("nncp-xfer", les, err, "open")
isBad = true
goto Tx
}
fis, err = dir.Readdir(0)
dir.Close() // #nosec G104
if err != nil {
- ctx.LogE("nncp-xfer", sds, err, "read")
+ ctx.LogE("nncp-xfer", les, err, "read")
isBad = true
goto Tx
}
continue
}
nodeId, err := nncp.NodeIdFromString(fi.Name())
- sds["node"] = fi.Name()
+ les := append(les, nncp.LE{K: "Node", V: fi.Name()})
if err != nil {
- ctx.LogD("nncp-xfer", sds, "is not NodeId")
+ ctx.LogD("nncp-xfer", les, "is not NodeId")
continue
}
if nodeOnly != nil && *nodeId != *nodeOnly.Id {
- ctx.LogD("nncp-xfer", sds, "skip")
+ ctx.LogD("nncp-xfer", les, "skip")
continue
}
if _, known := ctx.Neigh[*nodeId]; !known {
- ctx.LogD("nncp-xfer", sds, "unknown")
+ ctx.LogD("nncp-xfer", les, "unknown")
continue
}
dir, err = os.Open(filepath.Join(selfPath, fi.Name()))
if err != nil {
- ctx.LogE("nncp-xfer", sds, err, "open")
+ ctx.LogE("nncp-xfer", les, err, "open")
isBad = true
continue
}
fisInt, err := dir.Readdir(0)
dir.Close() // #nosec G104
if err != nil {
- ctx.LogE("nncp-xfer", sds, err, "read")
+ ctx.LogE("nncp-xfer", les, err, "read")
isBad = true
continue
}
continue
}
filename := filepath.Join(dir.Name(), fiInt.Name())
- sds["file"] = filename
- delete(sds, "size")
+ les := append(les, nncp.LE{K: "File", V: filename})
fd, err := os.Open(filename)
if err != nil {
- ctx.LogE("nncp-xfer", sds, err, "open")
+ ctx.LogE("nncp-xfer", les, err, "open")
isBad = true
continue
}
var pktEnc nncp.PktEnc
_, err = xdr.Unmarshal(fd, &pktEnc)
if err != nil || pktEnc.Magic != nncp.MagicNNCPEv4 {
- ctx.LogD("nncp-xfer", sds, "is not a packet")
+ ctx.LogD("nncp-xfer", les, "is not a packet")
fd.Close() // #nosec G104
continue
}
if pktEnc.Nice > nice {
- ctx.LogD("nncp-xfer", sds, "too nice")
+ ctx.LogD("nncp-xfer", les, "too nice")
fd.Close() // #nosec G104
continue
}
- sds["size"] = fiInt.Size()
+ les = append(les, nncp.LE{K: "Size", V: fiInt.Size()})
if !ctx.IsEnoughSpace(fiInt.Size()) {
- ctx.LogE("nncp-xfer", sds, errors.New("is not enough space"), "")
+ ctx.LogE("nncp-xfer", les, errors.New("is not enough space"), "")
fd.Close() // #nosec G104
continue
}
err = w.Close()
}
if err != nil {
- ctx.LogE("nncp-xfer", sds, err, "copy")
+ ctx.LogE("nncp-xfer", les, err, "copy")
w.CloseWithError(err) // #nosec G104
}
}()
if _, err = nncp.CopyProgressed(
tmp.W, r, "Rx",
- nncp.SdsAdd(sds, nncp.SDS{
- "pkt": filename,
- "fullsize": sds["size"],
- }),
+ append(les, nncp.LEs{
+ {K: "Pkt", V: filename},
+ {K: "FullSize", V: fiInt.Size()},
+ }...),
ctx.ShowPrgrs,
); err != nil {
- ctx.LogE("nncp-xfer", sds, err, "copy")
+ ctx.LogE("nncp-xfer", les, err, "copy")
isBad = true
}
fd.Close() // #nosec G104
)); err != nil {
log.Fatalln(err)
}
- ctx.LogI("nncp-xfer", sds, "")
+ ctx.LogI("nncp-xfer", les, "")
if !*keep {
if err = os.Remove(filename); err != nil {
- ctx.LogE("nncp-xfer", sds, err, "remove")
+ ctx.LogE("nncp-xfer", les, err, "remove")
isBad = true
}
}
}
return
}
- sds["xx"] = string(nncp.TTx)
- for nodeId, _ := range ctx.Neigh {
- sds["node"] = nodeId
+ for nodeId := range ctx.Neigh {
+ les := nncp.LEs{
+ {K: "XX", V: string(nncp.TTx)},
+ {K: "Node", V: nodeId},
+ }
if nodeOnly != nil && nodeId != *nodeOnly.Id {
- ctx.LogD("nncp-xfer", sds, "skip")
+ ctx.LogD("nncp-xfer", les, "skip")
continue
}
dirLock, err := ctx.LockDir(&nodeId, string(nncp.TTx))
continue
}
nodePath := filepath.Join(flag.Arg(0), nodeId.String())
- sds["dir"] = nodePath
+ les = append(les, nncp.LE{K: "Dir", V: nodePath})
_, err = os.Stat(nodePath)
if err != nil {
if os.IsNotExist(err) {
- ctx.LogD("nncp-xfer", sds, "does not exist")
+ ctx.LogD("nncp-xfer", les, "does not exist")
if !*mkdir {
ctx.UnlockDir(dirLock)
continue
}
if err = os.Mkdir(nodePath, os.FileMode(0777)); err != nil {
ctx.UnlockDir(dirLock)
- ctx.LogE("nncp-xfer", sds, err, "mkdir")
+ ctx.LogE("nncp-xfer", les, err, "mkdir")
isBad = true
continue
}
} else {
ctx.UnlockDir(dirLock)
- ctx.LogE("nncp-xfer", sds, err, "stat")
+ ctx.LogE("nncp-xfer", les, err, "stat")
isBad = true
continue
}
}
dstPath := filepath.Join(nodePath, ctx.SelfId.String())
- sds["dir"] = dstPath
+ les[len(les)-1].V = dstPath
_, err = os.Stat(dstPath)
if err != nil {
if os.IsNotExist(err) {
if err = os.Mkdir(dstPath, os.FileMode(0777)); err != nil {
ctx.UnlockDir(dirLock)
- ctx.LogE("nncp-xfer", sds, err, "mkdir")
+ ctx.LogE("nncp-xfer", les, err, "mkdir")
isBad = true
continue
}
} else {
ctx.UnlockDir(dirLock)
- ctx.LogE("nncp-xfer", sds, err, "stat")
+ ctx.LogE("nncp-xfer", les, err, "stat")
isBad = true
continue
}
}
- delete(sds, "dir")
+ les = les[:len(les)-1]
for job := range ctx.Jobs(&nodeId, nncp.TTx) {
pktName := filepath.Base(job.Fd.Name())
- sds["pkt"] = pktName
+ les := append(les, nncp.LE{K: "Pkt", V: pktName})
if job.PktEnc.Nice > nice {
- ctx.LogD("nncp-xfer", sds, "too nice")
+ ctx.LogD("nncp-xfer", les, "too nice")
job.Fd.Close() // #nosec G104
continue
}
if _, err = os.Stat(filepath.Join(dstPath, pktName)); err == nil || !os.IsNotExist(err) {
- ctx.LogD("nncp-xfer", sds, "already exists")
+ ctx.LogD("nncp-xfer", les, "already exists")
job.Fd.Close() // #nosec G104
continue
}
if _, err = os.Stat(filepath.Join(dstPath, pktName+nncp.SeenSuffix)); err == nil || !os.IsNotExist(err) {
- ctx.LogD("nncp-xfer", sds, "already exists")
+ ctx.LogD("nncp-xfer", les, "already exists")
job.Fd.Close() // #nosec G104
continue
}
tmp, err := nncp.TempFile(dstPath, "xfer")
if err != nil {
- ctx.LogE("nncp-xfer", sds, err, "mktemp")
+ ctx.LogE("nncp-xfer", les, err, "mktemp")
job.Fd.Close() // #nosec G104
isBad = true
break
}
- sds["tmp"] = tmp.Name()
- ctx.LogD("nncp-xfer", sds, "created")
+ les = append(les, nncp.LE{K: "Tmp", V: tmp.Name()})
+ ctx.LogD("nncp-xfer", les, "created")
bufW := bufio.NewWriter(tmp)
copied, err := nncp.CopyProgressed(
bufW, bufio.NewReader(job.Fd), "Tx",
- nncp.SdsAdd(sds, nncp.SDS{"fullsize": job.Size}),
+ append(les, nncp.LE{K: "FullSize", V: job.Size}),
ctx.ShowPrgrs,
)
job.Fd.Close() // #nosec G104
if err != nil {
- ctx.LogE("nncp-xfer", sds, err, "copy")
+ ctx.LogE("nncp-xfer", les, err, "copy")
tmp.Close() // #nosec G104
isBad = true
continue
}
if err = bufW.Flush(); err != nil {
tmp.Close() // #nosec G104
- ctx.LogE("nncp-xfer", sds, err, "flush")
+ ctx.LogE("nncp-xfer", les, err, "flush")
isBad = true
continue
}
if err = tmp.Sync(); err != nil {
tmp.Close() // #nosec G104
- ctx.LogE("nncp-xfer", sds, err, "sync")
+ ctx.LogE("nncp-xfer", les, err, "sync")
isBad = true
continue
}
if err = tmp.Close(); err != nil {
- ctx.LogE("nncp-xfer", sds, err, "sync")
+ ctx.LogE("nncp-xfer", les, err, "sync")
}
if err = os.Rename(tmp.Name(), filepath.Join(dstPath, pktName)); err != nil {
- ctx.LogE("nncp-xfer", sds, err, "rename")
+ ctx.LogE("nncp-xfer", les, err, "rename")
isBad = true
continue
}
if err = nncp.DirSync(dstPath); err != nil {
- ctx.LogE("nncp-xfer", sds, err, "sync")
+ ctx.LogE("nncp-xfer", les, err, "sync")
isBad = true
continue
}
os.Remove(filepath.Join(dstPath, pktName+".part")) // #nosec G104
- delete(sds, "tmp")
- ctx.LogI("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"size": copied}), "")
+ les = les[:len(les)-1]
+ ctx.LogI("nncp-xfer", append(les, nncp.LE{K: "Size", V: copied}), "")
if !*keep {
if err = os.Remove(job.Fd.Name()); err != nil {
- ctx.LogE("nncp-xfer", sds, err, "remove")
+ ctx.LogE("nncp-xfer", les, err, "remove")
isBad = true
}
}
func (ctx *Ctx) ensureRxDir(nodeId *NodeId) error {
dirPath := filepath.Join(ctx.Spool, nodeId.String(), string(TRx))
if err := os.MkdirAll(dirPath, os.FileMode(0777)); err != nil {
- ctx.LogE("dir-ensure", SDS{"dir": dirPath}, err, "")
+ ctx.LogE("dir-ensure", LEs{{"Dir", dirPath}}, err, "")
return err
}
fd, err := os.Open(dirPath)
if err != nil {
- ctx.LogE("dir-ensure", SDS{"dir": dirPath}, err, "")
+ ctx.LogE("dir-ensure", LEs{{"Dir", dirPath}}, err, "")
return err
}
return fd.Close()
github.com/klauspost/compress v1.11.4
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
go.cypherpunks.ru/balloon v1.1.1
+ go.cypherpunks.ru/recfile v0.4.3
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad
golang.org/x/net v0.0.0-20201224014010-6772e930b67b
golang.org/x/sys v0.0.0-20210105210732-16f7687f5001
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
go.cypherpunks.ru/balloon v1.1.1 h1:ypHM1DRf/XuCrp9pDkTHg00CqZX/Np/APb//iHvDJTA=
go.cypherpunks.ru/balloon v1.1.1/go.mod h1:k4s4ozrIrhpBjj78Z7LX8ZHxMQ+XE7DZUWl8gP2ojCo=
+go.cypherpunks.ru/recfile v0.4.3 h1:ephokihmV//p0ob6gx2FWXvm28/NBDbWTOJPUNahxO8=
+go.cypherpunks.ru/recfile v0.4.3/go.mod h1:sR+KajB+vzofL3SFVFwKt3Fke0FaCcN1g3YPNAhU3qI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad h1:DN0cp81fZ3njFcrLCytUHRSUkqBjfTo4Tx9RJTWs0EY=
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
package nncp
import (
+ "errors"
"fmt"
- "regexp"
"strconv"
"strings"
"time"
"github.com/dustin/go-humanize"
+ "go.cypherpunks.ru/recfile"
)
-func (ctx *Ctx) Humanize(s string) string {
- s = strings.TrimRight(s, "\n")
- splitted := strings.SplitN(s, " ", 4)
- if len(splitted) != 4 {
- return s
- }
- var level string
- if splitted[0] == "E" {
- level = "ERROR "
- }
- when, err := time.Parse(time.RFC3339Nano, splitted[1])
+func (ctx *Ctx) HumanizeRec(rec string) string {
+ r := recfile.NewReader(strings.NewReader(rec))
+ le, err := r.NextMap()
if err != nil {
- return s
+ return rec
}
- who := splitted[2][1:]
- closingBracket := strings.LastIndex(splitted[3], "]")
- if closingBracket == -1 {
- return s
- }
- rem := strings.Trim(splitted[3][closingBracket+1:], " ")
- sds := make(map[string]string)
-
- re := regexp.MustCompile(`\w+="[^"]+"`)
- for _, pair := range re.FindAllString(splitted[3][:closingBracket+1], -1) {
- sep := strings.Index(pair, "=")
- sds[pair[:sep]] = pair[sep+2 : len(pair)-1]
+ humanized, err := ctx.Humanize(le)
+ if err != nil {
+ return fmt.Sprintf("Can not humanize: %s\n%s", err, rec)
}
+ return humanized
+}
- nodeS := sds["node"]
+func (ctx *Ctx) Humanize(le map[string]string) (string, error) {
+ nodeS := le["Node"]
node, err := ctx.FindNode(nodeS)
if err == nil {
nodeS = node.Name
}
var size string
- if sizeRaw, exists := sds["size"]; exists {
+ if sizeRaw, exists := le["Size"]; exists {
sp, err := strconv.ParseUint(sizeRaw, 10, 64)
if err != nil {
- return s
+ return "", err
}
size = humanize.IBytes(uint64(sp))
}
var msg string
- switch who {
+ switch le["Who"] {
case "tx":
- switch sds["type"] {
+ switch le["Type"] {
case "file":
msg = fmt.Sprintf(
"File %s (%s) transfer to %s:%s: %s",
- sds["src"], size, nodeS, sds["dst"], rem,
+ le["Src"], size, nodeS, le["Dst"], le["Msg"],
)
case "freq":
msg = fmt.Sprintf(
"File request from %s:%s to %s: %s",
- nodeS, sds["src"], sds["dst"], rem,
+ nodeS, le["Src"], le["Dst"], le["Msg"],
)
case "exec":
msg = fmt.Sprintf(
"Exec to %s@%s (%s): %s",
- nodeS, sds["dst"], size, rem,
+ nodeS, le["Dst"], size, le["Msg"],
)
case "trns":
msg = fmt.Sprintf(
"Transitional packet to %s (%s) (nice %s): %s",
- nodeS, size, sds["nice"], rem,
+ nodeS, size, le["Nice"], le["Msg"],
)
default:
- return s
+ return "", errors.New("unknown \"tx\" type")
}
- if err, exists := sds["err"]; exists {
+ if err, exists := le["Err"]; exists {
msg += ": " + err
}
case "rx":
- switch sds["type"] {
+ switch le["Type"] {
case "exec":
- msg = fmt.Sprintf(
- "Got exec from %s to %s (%s)",
- nodeS, sds["dst"], size,
- )
+ msg = fmt.Sprintf("Got exec from %s to %s (%s)", nodeS, le["Dst"], size)
case "file":
- msg = fmt.Sprintf("Got file %s (%s) from %s", sds["dst"], size, nodeS)
+ msg = fmt.Sprintf("Got file %s (%s) from %s", le["Dst"], size, nodeS)
case "freq":
- msg = fmt.Sprintf("Got file request %s to %s", sds["src"], nodeS)
+ msg = fmt.Sprintf("Got file request %s to %s", le["Src"], nodeS)
case "trns":
- nodeT := sds["dst"]
+ nodeT := le["Dst"]
node, err := ctx.FindNode(nodeT)
if err == nil {
nodeT = node.Name
nodeS, nodeT, size,
)
default:
- return s
+ return "", errors.New("unknown \"rx\" type")
}
- if err, exists := sds["err"]; exists {
+ if err, exists := le["Err"]; exists {
msg += ": " + err
}
case "check":
- msg = fmt.Sprintf("Checking: %s/%s/%s", sds["node"], sds["xx"], sds["pkt"])
- if err, exists := sds["err"]; exists {
+ msg = fmt.Sprintf("Checking: %s/%s/%s", le["Node"], le["XX"], le["Pkt"])
+ if err, exists := le["Err"]; exists {
msg += fmt.Sprintf(" %s", err)
}
case "nncp-xfer":
- switch sds["xx"] {
+ switch le["XX"] {
case "rx":
msg = "Packet transfer, received from"
case "tx":
msg = "Packet transfer, sent to"
default:
- return s
+ return "", errors.New("unknown XX")
}
if nodeS != "" {
msg += " node " + nodeS
if size != "" {
msg += fmt.Sprintf(" (%s)", size)
}
- if err, exists := sds["err"]; exists {
+ if err, exists := le["Err"]; exists {
msg += ": " + err
} else {
- msg += " " + rem
+ msg += " " + le["Msg"]
}
case "nncp-bundle":
- switch sds["xx"] {
+ switch le["XX"] {
case "rx":
msg = "Bundle transfer, received from"
case "tx":
msg = "Bundle transfer, sent to"
default:
- return s
+ return "", errors.New("unknown XX")
}
if nodeS != "" {
msg += " node " + nodeS
}
- msg += " " + sds["pkt"]
+ msg += " " + le["Pkt"]
if size != "" {
msg += fmt.Sprintf(" (%s)", size)
}
- if err, exists := sds["err"]; exists {
+ if err, exists := le["Err"]; exists {
msg += ": " + err
}
case "nncp-rm":
- msg += "removing " + sds["file"]
+ msg += "removing " + le["File"]
case "call-start":
msg = fmt.Sprintf("Connection to %s", nodeS)
- if err, exists := sds["err"]; exists {
+ if err, exists := le["Err"]; exists {
msg += ": " + err
}
case "call-finish":
- rx, err := strconv.ParseUint(sds["rxbytes"], 10, 64)
+ rx, err := strconv.ParseUint(le["RxBytes"], 10, 64)
if err != nil {
- return s
+ return "", err
}
- rxs, err := strconv.ParseUint(sds["rxspeed"], 10, 64)
+ rxs, err := strconv.ParseUint(le["RxSpeed"], 10, 64)
if err != nil {
- return s
+ return "", err
}
- tx, err := strconv.ParseUint(sds["txbytes"], 10, 64)
+ tx, err := strconv.ParseUint(le["TxBytes"], 10, 64)
if err != nil {
- return s
+ return "", err
}
- txs, err := strconv.ParseUint(sds["txspeed"], 10, 64)
+ txs, err := strconv.ParseUint(le["TxSpeed"], 10, 64)
if err != nil {
- return s
+ return "", err
}
msg = fmt.Sprintf(
"Finished call with %s: %s received (%s/sec), %s transferred (%s/sec)",
case "sp-start":
if nodeS == "" {
msg += "SP"
- if peer, exists := sds["peer"]; exists {
+ if peer, exists := le["Peer"]; exists {
msg += fmt.Sprintf(": %s", peer)
}
} else {
- nice, err := NicenessParse(sds["nice"])
+ nice, err := NicenessParse(le["Nice"])
if err != nil {
- return s
+ return "", err
}
msg += fmt.Sprintf("SP with %s (nice %s)", nodeS, NicenessFmt(nice))
}
- if len(rem) > 0 {
- msg += ": " + rem
+ if m, exists := le["Msg"]; exists {
+ msg += ": " + m
}
- if err, exists := sds["err"]; exists {
+ if err, exists := le["Err"]; exists {
msg += ": " + err
}
case "sp-info":
- nice, err := NicenessParse(sds["nice"])
+ nice, err := NicenessParse(le["Nice"])
if err != nil {
- return s
+ return "", err
}
msg = fmt.Sprintf(
"Packet %s (%s) (nice %s)",
- sds["pkt"],
- size,
- NicenessFmt(nice),
+ le["Pkt"], size, NicenessFmt(nice),
)
- offsetParsed, err := strconv.ParseUint(sds["offset"], 10, 64)
+ offsetParsed, err := strconv.ParseUint(le["Offset"], 10, 64)
if err != nil {
- return s
+ return "", err
}
- sizeParsed, err := strconv.ParseUint(sds["size"], 10, 64)
+ sizeParsed, err := strconv.ParseUint(le["Size"], 10, 64)
if err != nil {
- return s
+ return "", err
}
msg += fmt.Sprintf(": %d%%", 100*offsetParsed/sizeParsed)
- if len(rem) > 0 {
- msg += ": " + rem
+ if m, exists := le["Msg"]; exists {
+ msg += ": " + m
}
case "sp-infos":
- switch sds["xx"] {
+ switch le["XX"] {
case "rx":
msg = fmt.Sprintf("%s has got for us: ", nodeS)
case "tx":
msg = fmt.Sprintf("We have got for %s: ", nodeS)
default:
- return s
+ return "", errors.New("unknown XX")
}
- msg += fmt.Sprintf("%s packets, %s", sds["pkts"], size)
+ msg += fmt.Sprintf("%s packets, %s", le["Pkts"], size)
case "sp-process":
- msg = fmt.Sprintf("%s has %s (%s): %s", nodeS, sds["pkt"], size, rem)
+ msg = fmt.Sprintf("%s has %s (%s): %s", nodeS, le["Pkt"], size, le["Msg"])
case "sp-file":
- switch sds["xx"] {
+ switch le["XX"] {
case "rx":
msg = "Got packet "
case "tx":
msg = "Sent packet "
default:
- return s
+ return "", errors.New("unknown XX")
}
- fullsize, err := strconv.ParseUint(sds["fullsize"], 10, 64)
+ fullsize, err := strconv.ParseUint(le["FullSize"], 10, 64)
if err != nil {
- return s
+ return "", err
}
- sizeParsed, err := strconv.ParseUint(sds["size"], 10, 64)
+ sizeParsed, err := strconv.ParseUint(le["Size"], 10, 64)
if err != nil {
- return s
+ return "", err
}
msg += fmt.Sprintf(
"%s %d%% (%s / %s)",
- sds["pkt"],
+ le["Pkt"],
100*sizeParsed/fullsize,
humanize.IBytes(uint64(sizeParsed)),
humanize.IBytes(uint64(fullsize)),
)
case "sp-done":
- switch sds["xx"] {
+ switch le["XX"] {
case "rx":
- msg = fmt.Sprintf("Packet %s is retreived (%s)", sds["pkt"], size)
+ msg = fmt.Sprintf("Packet %s is retreived (%s)", le["Pkt"], size)
case "tx":
- msg = fmt.Sprintf("Packet %s is sent", sds["pkt"])
+ msg = fmt.Sprintf("Packet %s is sent", le["Pkt"])
default:
- return s
+ return "", errors.New("unknown XX")
}
case "nncp-reass":
- chunkNum, exists := sds["chunk"]
+ chunkNum, exists := le["Chunk"]
if exists {
msg = fmt.Sprintf(
"Reassembling chunked file \"%s\" (chunk %s): %s",
- sds["path"],
- chunkNum,
- rem,
+ le["Path"], chunkNum, le["Msg"],
)
} else {
msg = fmt.Sprintf(
"Reassembling chunked file \"%s\": %s",
- sds["path"],
- rem,
+ le["Path"], le["Msg"],
)
}
- if err, exists := sds["err"]; exists {
+ if err, exists := le["Err"]; exists {
msg += ": " + err
}
case "lockdir":
- msg = fmt.Sprintf("Acquire lock for %s: %s", sds["path"], sds["err"])
+ msg = fmt.Sprintf("Acquire lock for %s: %s", le["Path"], le["Err"])
default:
- return s
+ return "", errors.New("unknown Who")
+ }
+ when, err := time.Parse(time.RFC3339Nano, le["When"])
+ if err != nil {
+ return "", err
+ }
+ var level string
+ if _, isErr := le["Err"]; isErr {
+ level = "ERROR "
}
- return fmt.Sprintf("%s %s%s", when.Format(time.RFC3339), level, msg)
+ return fmt.Sprintf("%s %s%s", when.Format(time.RFC3339), level, msg), nil
}
fd.Close() // #nosec G104
continue
}
- ctx.LogD("jobs", SDS{
- "xx": string(xx),
- "node": pktEnc.Sender,
- "name": fi.Name(),
- "nice": int(pktEnc.Nice),
- "size": fi.Size(),
+ ctx.LogD("jobs", LEs{
+ {"XX", string(xx)},
+ {"Node", pktEnc.Sender},
+ {"Name", fi.Name()},
+ {"Nice", int(pktEnc.Nice)},
+ {"Size", fi.Size()},
}, "taken")
job := Job{
PktEnc: &pktEnc,
func (ctx *Ctx) LockDir(nodeId *NodeId, lockCtx string) (*os.File, error) {
if err := ctx.ensureRxDir(nodeId); err != nil {
- ctx.LogE("lockdir", SDS{}, err, "")
+ ctx.LogE("lockdir", LEs{}, err, "")
return nil, err
}
lockPath := filepath.Join(ctx.Spool, nodeId.String(), lockCtx) + ".lock"
os.FileMode(0666),
)
if err != nil {
- ctx.LogE("lockdir", SDS{"path": lockPath}, err, "")
+ ctx.LogE("lockdir", LEs{{"path", lockPath}}, err, "")
return nil, err
}
err = unix.Flock(int(dirLock.Fd()), unix.LOCK_EX|unix.LOCK_NB)
if err != nil {
- ctx.LogE("lockdir", SDS{"path": lockPath}, err, "")
+ ctx.LogE("lockdir", LEs{{"path", lockPath}}, err, "")
dirLock.Close() // #nosec G104
return nil, err
}
package nncp
import (
+ "bytes"
"fmt"
"os"
- "sort"
- "strings"
"time"
+ "go.cypherpunks.ru/recfile"
"golang.org/x/sys/unix"
)
-type LogLevel string
-
-type SDS map[string]interface{}
+type LE struct {
+ K string
+ V interface{}
+}
+type LEs []LE
-func sdFmt(who string, sds SDS) string {
- keys := make([]string, 0, len(sds))
- for k, _ := range sds {
- keys = append(keys, k)
- }
- sort.Strings(keys)
- result := make([]string, 0, 1+len(keys))
- result = append(result, "["+who)
- for _, k := range keys {
- var value string
- switch v := sds[k].(type) {
+func (les LEs) Rec() string {
+ fields := make([]recfile.Field, 0, len(les)+1)
+ fields = append(fields, recfile.Field{
+ Name: "When", Value: time.Now().UTC().Format(time.RFC3339Nano),
+ })
+ var val string
+ for _, le := range les {
+ switch v := le.V.(type) {
case int, int8, uint8, int64, uint64:
- value = fmt.Sprintf("%d", v)
+ val = fmt.Sprintf("%d", v)
default:
- value = fmt.Sprintf("%s", v)
+ val = fmt.Sprintf("%s", v)
}
- result = append(result, fmt.Sprintf(`%s="%s"`, k, value))
+ fields = append(fields, recfile.Field{Name: le.K, Value: val})
}
- return strings.Join(result, " ") + "]"
-}
-
-func msgFmt(level LogLevel, who string, sds SDS, msg string) string {
- result := fmt.Sprintf(
- "%s %s %s",
- level,
- time.Now().UTC().Format(time.RFC3339Nano),
- sdFmt(who, sds),
- )
- if len(msg) > 0 {
- result += " " + msg
+ b := bytes.NewBuffer(make([]byte, 0, 1<<10))
+ w := recfile.NewWriter(b)
+ _, err := w.RecordStart()
+ if err != nil {
+ panic(err)
}
- return result + "\n"
+ _, err = w.WriteFields(fields...)
+ if err != nil {
+ panic(err)
+ }
+ return b.String()
}
-func (ctx *Ctx) Log(msg string) {
+func (ctx *Ctx) Log(rec string) {
fdLock, err := os.OpenFile(
ctx.LogPath+".lock",
os.O_CREATE|os.O_WRONLY,
fmt.Fprintln(os.Stderr, "Can not open log:", err)
return
}
- fd.WriteString(msg) // #nosec G104
+ fd.WriteString(rec) // #nosec G104
fd.Close() // #nosec G104
}
-func (ctx *Ctx) LogD(who string, sds SDS, msg string) {
+func (ctx *Ctx) LogD(who string, les LEs, msg string) {
if !ctx.Debug {
return
}
- fmt.Fprint(os.Stderr, msgFmt(LogLevel("D"), who, sds, msg))
-}
-
-func (ctx *Ctx) LogI(who string, sds SDS, msg string) {
- msg = msgFmt(LogLevel("I"), who, sds, msg)
- if !ctx.Quiet {
- fmt.Fprintln(os.Stderr, ctx.Humanize(msg))
+ les = append(LEs{{"Debug", true}, {"Who", who}}, les...)
+ if msg != "" {
+ les = append(les, LE{"Msg", msg})
}
- ctx.Log(msg)
+ fmt.Fprint(os.Stderr, les.Rec())
}
-func (ctx *Ctx) LogE(who string, sds SDS, err error, msg string) {
- sds["err"] = err.Error()
- msg = msgFmt(LogLevel("E"), who, sds, msg)
- if len(msg) > 2048 {
- msg = msg[:2048]
+func (ctx *Ctx) LogI(who string, les LEs, msg string) {
+ les = append(LEs{{"Who", who}}, les...)
+ if msg != "" {
+ les = append(les, LE{"Msg", msg})
}
- fmt.Fprintln(os.Stderr, ctx.Humanize(msg))
- ctx.Log(msg)
+ rec := les.Rec()
+ if !ctx.Quiet {
+ fmt.Fprintln(os.Stderr, ctx.HumanizeRec(rec))
+ }
+ ctx.Log(rec)
}
-func SdsAdd(sds, add SDS) SDS {
- neu := SDS{}
- for k, v := range sds {
- neu[k] = v
+func (ctx *Ctx) LogE(who string, les LEs, err error, msg string) {
+ les = append(LEs{{"Err", err.Error()}, {"Who", who}}, les...)
+ if msg != "" {
+ les = append(les, LE{"Msg", msg})
}
- for k, v := range add {
- neu[k] = v
+ rec := les.Rec()
+ if !ctx.Quiet {
+ fmt.Fprintln(os.Stderr, ctx.HumanizeRec(rec))
}
- return neu
+ ctx.Log(rec)
}
)
var (
- Version string = "5.7.0"
+ Version string = "6.0.0"
Base32Codec *base32.Encoding = base32.StdEncoding.WithPadding(base32.NoPadding)
)
dst io.Writer,
src io.Reader,
prgrsPrefix string,
- sds SDS,
+ les LEs,
showPrgrs bool,
) (written int64, err error) {
buf := make([]byte, EncBlkSize)
if nw > 0 {
written += int64(nw)
if showPrgrs {
- sds["size"] = written
- Progress(prgrsPrefix, sds)
+ Progress(prgrsPrefix, append(les, LE{"Size", written}))
}
}
if ew != nil {
return
}
-func Progress(prefix string, sds SDS) {
+func Progress(prefix string, les LEs) {
var size int64
- if sizeI, exists := sds["size"]; exists {
- size = sizeI.(int64)
+ var fullsize int64
+ var pkt string
+ for _, le := range les {
+ switch le.K {
+ case "Size":
+ size = le.V.(int64)
+ case "FullSize":
+ fullsize = le.V.(int64)
+ case "Pkt":
+ pkt = le.V.(string)
+ }
}
- fullsize := sds["fullsize"].(int64)
- pkt := sds["pkt"].(string)
progressBarsLock.RLock()
pb, exists := progressBars[pkt]
progressBarsLock.RUnlock()
var payloads [][]byte
for _, info := range infos {
payloads = append(payloads, MarshalSP(SPTypeInfo, info))
- ctx.LogD("sp-info-our", SDS{
- "node": nodeId,
- "name": Base32Codec.EncodeToString(info.Hash[:]),
- "size": info.Size,
+ ctx.LogD("sp-info-our", LEs{
+ {"Node", nodeId},
+ {"Name", Base32Codec.EncodeToString(info.Hash[:])},
+ {"Size", info.Size},
}, "")
}
if totalSize > 0 {
- ctx.LogI("sp-infos", SDS{
- "xx": string(TTx),
- "node": nodeId,
- "pkts": len(payloads),
- "size": totalSize,
+ ctx.LogI("sp-infos", LEs{
+ {"XX", string(TTx)},
+ {"Node", nodeId},
+ {"Pkts", len(payloads)},
+ {"Size", totalSize},
}, "")
}
return payloadsSplit(payloads)
state.dirUnlock()
return err
}
- sds := SDS{"node": nodeId, "nice": int(state.Nice)}
- state.Ctx.LogD("sp-start", sds, "sending first message")
+ les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}}
+ state.Ctx.LogD("sp-start", les, "sending first message")
conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
if err = state.WriteSP(conn, buf, false); err != nil {
- state.Ctx.LogE("sp-start", sds, err, "")
+ state.Ctx.LogE("sp-start", les, err, "")
state.dirUnlock()
return err
}
- state.Ctx.LogD("sp-start", sds, "waiting for first message")
+ state.Ctx.LogD("sp-start", les, "waiting for first message")
conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
if buf, err = state.ReadSP(conn); err != nil {
- state.Ctx.LogE("sp-start", sds, err, "")
+ state.Ctx.LogE("sp-start", les, err, "")
state.dirUnlock()
return err
}
payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
if err != nil {
- state.Ctx.LogE("sp-start", sds, err, "")
+ state.Ctx.LogE("sp-start", les, err, "")
state.dirUnlock()
return err
}
- state.Ctx.LogD("sp-start", sds, "starting workers")
+ state.Ctx.LogD("sp-start", les, "starting workers")
err = state.StartWorkers(conn, infosPayloads, payload)
if err != nil {
- state.Ctx.LogE("sp-start", sds, err, "")
+ state.Ctx.LogE("sp-start", les, err, "")
state.dirUnlock()
}
return err
state.xxOnly = xxOnly
var buf []byte
var payload []byte
- state.Ctx.LogD("sp-start", SDS{"nice": int(state.Nice)}, "waiting for first message")
+ state.Ctx.LogD("sp-start", LEs{{"Nice", int(state.Nice)}}, "waiting for first message")
conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
if buf, err = state.ReadSP(conn); err != nil {
- state.Ctx.LogE("sp-start", SDS{}, err, "")
+ state.Ctx.LogE("sp-start", LEs{}, err, "")
return err
}
if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
- state.Ctx.LogE("sp-start", SDS{}, err, "")
+ state.Ctx.LogE("sp-start", LEs{}, err, "")
return err
}
}
if node == nil {
peerId := Base32Codec.EncodeToString(state.hs.PeerStatic())
- state.Ctx.LogE("sp-start", SDS{"peer": peerId}, errors.New("unknown"), "")
+ state.Ctx.LogE("sp-start", LEs{{"Peer", peerId}}, errors.New("unknown peer"), "")
return errors.New("Unknown peer: " + peerId)
}
state.Node = node
state.txRate = node.TxRate
state.onlineDeadline = node.OnlineDeadline
state.maxOnlineTime = node.MaxOnlineTime
- sds := SDS{"node": node.Id, "nice": int(state.Nice)}
+ les := LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}}
if err = state.Ctx.ensureRxDir(node.Id); err != nil {
return err
firstPayload = append(firstPayload, SPHaltMarshalized...)
}
- state.Ctx.LogD("sp-start", sds, "sending first message")
+ state.Ctx.LogD("sp-start", les, "sending first message")
buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
if err != nil {
state.dirUnlock()
}
conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
if err = state.WriteSP(conn, buf, false); err != nil {
- state.Ctx.LogE("sp-start", sds, err, "")
+ state.Ctx.LogE("sp-start", les, err, "")
state.dirUnlock()
return err
}
- state.Ctx.LogD("sp-start", sds, "starting workers")
+ state.Ctx.LogD("sp-start", les, "starting workers")
err = state.StartWorkers(conn, infosPayloads, payload)
if err != nil {
state.dirUnlock()
infosPayloads [][]byte,
payload []byte,
) error {
- sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)}
+ les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
state.isDead = make(chan struct{})
if state.maxOnlineTime > 0 {
state.mustFinishAt = state.started.Add(state.maxOnlineTime)
for _, payload := range infosPayloads[1:] {
state.Ctx.LogD(
"sp-work",
- SdsAdd(sds, SDS{"size": len(payload)}),
+ append(les, LE{"Size", len(payload)}),
"queuing remaining payload",
)
state.payloads <- payload
// Processing of first payload and queueing its responses
state.Ctx.LogD(
"sp-work",
- SdsAdd(sds, SDS{"size": len(payload)}),
+ append(les, LE{"Size", len(payload)}),
"processing first payload",
)
replies, err := state.ProcessSP(payload)
if err != nil {
- state.Ctx.LogE("sp-work", sds, err, "")
+ state.Ctx.LogE("sp-work", les, err, "")
return err
}
state.wg.Add(1)
for _, reply := range replies {
state.Ctx.LogD(
"sp-work",
- SdsAdd(sds, SDS{"size": len(reply)}),
+ append(les, LE{"Size", len(reply)}),
"queuing reply",
)
state.payloads <- reply
) {
state.Ctx.LogD(
"sp-work",
- SdsAdd(sds, SDS{"size": len(payload)}),
+ append(les, LE{"Size", len(payload)}),
"queuing new info",
)
state.payloads <- payload
var ping bool
select {
case <-state.pings:
- state.Ctx.LogD("sp-xmit", sds, "got ping")
+ state.Ctx.LogD("sp-xmit", les, "got ping")
payload = SPPingMarshalized
ping = true
case payload = <-state.payloads:
state.Ctx.LogD(
"sp-xmit",
- SdsAdd(sds, SDS{"size": len(payload)}),
+ append(les, LE{"Size", len(payload)}),
"got payload",
)
default:
if state.txRate > 0 {
time.Sleep(time.Second / time.Duration(state.txRate))
}
- sdsp := SdsAdd(sds, SDS{
- "xx": string(TTx),
- "pkt": Base32Codec.EncodeToString(freq.Hash[:]),
- "size": int64(freq.Offset),
- })
- state.Ctx.LogD("sp-file", sdsp, "queueing")
+ lesp := append(les, LEs{
+ {"XX", string(TTx)},
+ {"Pkt", Base32Codec.EncodeToString(freq.Hash[:])},
+ {"Size", int64(freq.Offset)},
+ }...)
+ state.Ctx.LogD("sp-file", lesp, "queueing")
fd, err := os.Open(filepath.Join(
state.Ctx.Spool,
state.Node.Id.String(),
Base32Codec.EncodeToString(freq.Hash[:]),
))
if err != nil {
- state.Ctx.LogE("sp-file", sdsp, err, "")
+ state.Ctx.LogE("sp-file", lesp, err, "")
return
}
fi, err := fd.Stat()
if err != nil {
- state.Ctx.LogE("sp-file", sdsp, err, "")
+ state.Ctx.LogE("sp-file", lesp, err, "")
return
}
fullSize := fi.Size()
var buf []byte
if freq.Offset < uint64(fullSize) {
- state.Ctx.LogD("sp-file", sdsp, "seeking")
+ state.Ctx.LogD("sp-file", lesp, "seeking")
if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
- state.Ctx.LogE("sp-file", sdsp, err, "")
+ state.Ctx.LogE("sp-file", lesp, err, "")
return
}
buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
n, err := fd.Read(buf)
if err != nil {
- state.Ctx.LogE("sp-file", sdsp, err, "")
+ state.Ctx.LogE("sp-file", lesp, err, "")
return
}
buf = buf[:n]
- state.Ctx.LogD("sp-file", SdsAdd(sdsp, SDS{"size": n}), "read")
+ state.Ctx.LogD("sp-file", append(lesp, LE{"Size", n}), "read")
}
fd.Close() // #nosec G104
payload = MarshalSP(SPTypeFile, SPFile{
Payload: buf,
})
ourSize := freq.Offset + uint64(len(buf))
- sdsp["size"] = int64(ourSize)
- sdsp["fullsize"] = fullSize
+ lesp = append(lesp, LE{"Size", int64(ourSize)})
+ lesp = append(lesp, LE{"FullSize", fullSize})
if state.Ctx.ShowPrgrs {
- Progress("Tx", sdsp)
+ Progress("Tx", lesp)
}
state.Lock()
if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
if ourSize == uint64(fullSize) {
- state.Ctx.LogD("sp-file", sdsp, "finished")
+ state.Ctx.LogD("sp-file", lesp, "finished")
if len(state.queueTheir) > 1 {
state.queueTheir = state.queueTheir[1:]
} else {
state.queueTheir[0].freq.Offset += uint64(len(buf))
}
} else {
- state.Ctx.LogD("sp-file", sdsp, "queue disappeared")
+ state.Ctx.LogD("sp-file", lesp, "queue disappeared")
}
state.Unlock()
}
- state.Ctx.LogD("sp-xmit", SdsAdd(sds, SDS{"size": len(payload)}), "sending")
+ state.Ctx.LogD("sp-xmit", append(les, LE{"Size", len(payload)}), "sending")
conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
- state.Ctx.LogE("sp-xmit", sds, err, "")
+ state.Ctx.LogE("sp-xmit", les, err, "")
return
}
}
if state.NotAlive() {
break
}
- state.Ctx.LogD("sp-recv", sds, "waiting for payload")
+ state.Ctx.LogD("sp-recv", les, "waiting for payload")
conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
payload, err := state.ReadSP(conn)
if err != nil {
if unmarshalErr.ErrorCode == xdr.ErrIO {
break
}
- state.Ctx.LogE("sp-recv", sds, err, "")
+ state.Ctx.LogE("sp-recv", les, err, "")
break
}
state.Ctx.LogD(
"sp-recv",
- SdsAdd(sds, SDS{"size": len(payload)}),
+ append(les, LE{"Size", len(payload)}),
"got payload",
)
payload, err = state.csTheir.Decrypt(nil, nil, payload)
if err != nil {
- state.Ctx.LogE("sp-recv", sds, err, "")
+ state.Ctx.LogE("sp-recv", les, err, "")
break
}
state.Ctx.LogD(
"sp-recv",
- SdsAdd(sds, SDS{"size": len(payload)}),
+ append(les, LE{"Size", len(payload)}),
"processing",
)
replies, err := state.ProcessSP(payload)
if err != nil {
- state.Ctx.LogE("sp-recv", sds, err, "")
+ state.Ctx.LogE("sp-recv", les, err, "")
break
}
state.wg.Add(1)
for _, reply := range replies {
state.Ctx.LogD(
"sp-recv",
- SdsAdd(sds, SDS{"size": len(reply)}),
+ append(les, LE{"Size", len(reply)}),
"queuing reply",
)
state.payloads <- reply
}
func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
- sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)}
+ les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
r := bytes.NewReader(payload)
var err error
var replies [][]byte
var infosGot bool
for r.Len() > 0 {
- state.Ctx.LogD("sp-process", sds, "unmarshaling header")
+ state.Ctx.LogD("sp-process", les, "unmarshaling header")
var head SPHead
if _, err = xdr.Unmarshal(r, &head); err != nil {
- state.Ctx.LogE("sp-process", sds, err, "")
+ state.Ctx.LogE("sp-process", les, err, "")
return nil, err
}
if head.Type != SPTypePing {
}
switch head.Type {
case SPTypeHalt:
- state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "halt"}), "")
+ state.Ctx.LogD("sp-process", append(les, LE{"Type", "halt"}), "")
state.Lock()
state.queueTheir = nil
state.Unlock()
case SPTypePing:
- state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "ping"}), "")
+ state.Ctx.LogD("sp-process", append(les, LE{"Type", "ping"}), "")
case SPTypeInfo:
infosGot = true
- sdsp := SdsAdd(sds, SDS{"type": "info"})
- state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
+ lesp := append(les, LE{"Type", "info"})
+ state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
var info SPInfo
if _, err = xdr.Unmarshal(r, &info); err != nil {
- state.Ctx.LogE("sp-process", sdsp, err, "")
+ state.Ctx.LogE("sp-process", lesp, err, "")
return nil, err
}
- sdsp = SdsAdd(sds, SDS{
- "pkt": Base32Codec.EncodeToString(info.Hash[:]),
- "size": int64(info.Size),
- "nice": int(info.Nice),
- })
+ lesp = append(lesp, LEs{
+ {"Pkt", Base32Codec.EncodeToString(info.Hash[:])},
+ {"Size", int64(info.Size)},
+ {"Nice", int(info.Nice)},
+ }...)
if !state.listOnly && info.Nice > state.Nice {
- state.Ctx.LogD("sp-process", sdsp, "too nice")
+ state.Ctx.LogD("sp-process", lesp, "too nice")
continue
}
- state.Ctx.LogD("sp-process", sdsp, "received")
+ state.Ctx.LogD("sp-process", lesp, "received")
if !state.listOnly && state.xxOnly == TTx {
continue
}
state.Lock()
state.infosTheir[*info.Hash] = &info
state.Unlock()
- state.Ctx.LogD("sp-process", sdsp, "stating part")
+ state.Ctx.LogD("sp-process", lesp, "stating part")
pktPath := filepath.Join(
state.Ctx.Spool,
state.Node.Id.String(),
Base32Codec.EncodeToString(info.Hash[:]),
)
if _, err = os.Stat(pktPath); err == nil {
- state.Ctx.LogI("sp-info", sdsp, "already done")
+ state.Ctx.LogI("sp-info", lesp, "already done")
if !state.listOnly {
replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
}
continue
}
if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
- state.Ctx.LogI("sp-info", sdsp, "already seen")
+ state.Ctx.LogI("sp-info", lesp, "already seen")
if !state.listOnly {
replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
}
offset = fi.Size()
}
if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
- state.Ctx.LogI("sp-info", sdsp, "not enough space")
+ state.Ctx.LogI("sp-info", lesp, "not enough space")
continue
}
- state.Ctx.LogI(
- "sp-info",
- SdsAdd(sdsp, SDS{"offset": offset}),
- "",
- )
+ state.Ctx.LogI("sp-info", append(lesp, LE{"Offset", offset}), "")
if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
replies = append(replies, MarshalSP(
SPTypeFreq,
))
}
case SPTypeFile:
- sdsp := SdsAdd(sds, SDS{"type": "file"})
- state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
+ lesp := append(les, LE{"Type", "file"})
+ state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
var file SPFile
if _, err = xdr.Unmarshal(r, &file); err != nil {
- state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "file"}), err, "")
+ state.Ctx.LogE("sp-process", lesp, err, "")
return nil, err
}
- sdsp["xx"] = string(TRx)
- sdsp["pkt"] = Base32Codec.EncodeToString(file.Hash[:])
- sdsp["size"] = len(file.Payload)
+ lesp = append(lesp, LEs{
+ {"XX", string(TRx)},
+ {"Pkt", Base32Codec.EncodeToString(file.Hash[:])},
+ {"Size", len(file.Payload)},
+ }...)
dirToSync := filepath.Join(
state.Ctx.Spool,
state.Node.Id.String(),
string(TRx),
)
filePath := filepath.Join(dirToSync, Base32Codec.EncodeToString(file.Hash[:]))
- state.Ctx.LogD("sp-file", sdsp, "opening part")
+ state.Ctx.LogD("sp-file", lesp, "opening part")
fd, err := os.OpenFile(
filePath+PartSuffix,
os.O_RDWR|os.O_CREATE,
os.FileMode(0666),
)
if err != nil {
- state.Ctx.LogE("sp-file", sdsp, err, "")
+ state.Ctx.LogE("sp-file", lesp, err, "")
return nil, err
}
- state.Ctx.LogD(
- "sp-file",
- SdsAdd(sdsp, SDS{"offset": file.Offset}),
- "seeking",
- )
+ state.Ctx.LogD("sp-file", append(lesp, LE{"Offset", file.Offset}), "seeking")
if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
- state.Ctx.LogE("sp-file", sdsp, err, "")
+ state.Ctx.LogE("sp-file", lesp, err, "")
fd.Close() // #nosec G104
return nil, err
}
- state.Ctx.LogD("sp-file", sdsp, "writing")
+ state.Ctx.LogD("sp-file", lesp, "writing")
_, err = fd.Write(file.Payload)
if err != nil {
- state.Ctx.LogE("sp-file", sdsp, err, "")
+ state.Ctx.LogE("sp-file", lesp, err, "")
fd.Close() // #nosec G104
return nil, err
}
ourSize := int64(file.Offset + uint64(len(file.Payload)))
- sdsp["size"] = ourSize
+ lesp[len(lesp)-1].V = ourSize
fullsize := int64(0)
state.RLock()
infoTheir, ok := state.infosTheir[*file.Hash]
if ok {
fullsize = int64(infoTheir.Size)
}
- sdsp["fullsize"] = fullsize
+ lesp = append(lesp, LE{"FullSize", fullsize})
if state.Ctx.ShowPrgrs {
- Progress("Rx", sdsp)
+ Progress("Rx", lesp)
}
if fullsize != ourSize {
fd.Close() // #nosec G104
spCheckerToken <- struct{}{}
}()
if err := fd.Sync(); err != nil {
- state.Ctx.LogE("sp-file", sdsp, err, "sync")
+ state.Ctx.LogE("sp-file", lesp, err, "sync")
fd.Close() // #nosec G104
return
}
defer state.wg.Done()
if _, err = fd.Seek(0, io.SeekStart); err != nil {
fd.Close() // #nosec G104
- state.Ctx.LogE("sp-file", sdsp, err, "")
+ state.Ctx.LogE("sp-file", lesp, err, "")
return
}
- state.Ctx.LogD("sp-file", sdsp, "checking")
- gut, err := Check(fd, file.Hash[:], sdsp, state.Ctx.ShowPrgrs)
+ state.Ctx.LogD("sp-file", lesp, "checking")
+ gut, err := Check(fd, file.Hash[:], lesp, state.Ctx.ShowPrgrs)
fd.Close() // #nosec G104
if err != nil || !gut {
- state.Ctx.LogE("sp-file", sdsp, errors.New("checksum mismatch"), "")
+ state.Ctx.LogE("sp-file", lesp, errors.New("checksum mismatch"), "")
return
}
- state.Ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "")
+ state.Ctx.LogI("sp-done", lesp, "")
if err = os.Rename(filePath+PartSuffix, filePath); err != nil {
- state.Ctx.LogE("sp-file", sdsp, err, "rename")
+ state.Ctx.LogE("sp-file", lesp, err, "rename")
return
}
if err = DirSync(dirToSync); err != nil {
- state.Ctx.LogE("sp-file", sdsp, err, "sync")
+ state.Ctx.LogE("sp-file", lesp, err, "sync")
return
}
state.Lock()
}()
}()
case SPTypeDone:
- sdsp := SdsAdd(sds, SDS{"type": "done"})
- state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
+ lesp := append(les, LE{"Type", "done"})
+ state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
var done SPDone
if _, err = xdr.Unmarshal(r, &done); err != nil {
- state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "done"}), err, "")
+ state.Ctx.LogE("sp-process", lesp, err, "")
return nil, err
}
- sdsp["pkt"] = Base32Codec.EncodeToString(done.Hash[:])
- state.Ctx.LogD("sp-done", sdsp, "removing")
+ lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(done.Hash[:])})
+ state.Ctx.LogD("sp-done", lesp, "removing")
err := os.Remove(filepath.Join(
state.Ctx.Spool,
state.Node.Id.String(),
string(TTx),
Base32Codec.EncodeToString(done.Hash[:]),
))
- sdsp["xx"] = string(TTx)
+ lesp = append(lesp, LE{"XX", string(TTx)})
if err == nil {
- state.Ctx.LogI("sp-done", sdsp, "")
+ state.Ctx.LogI("sp-done", lesp, "")
} else {
- state.Ctx.LogE("sp-done", sdsp, err, "")
+ state.Ctx.LogE("sp-done", lesp, err, "")
}
case SPTypeFreq:
- sdsp := SdsAdd(sds, SDS{"type": "freq"})
- state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
+ lesp := append(les, LE{"Type", "freq"})
+ state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
var freq SPFreq
if _, err = xdr.Unmarshal(r, &freq); err != nil {
- state.Ctx.LogE("sp-process", sdsp, err, "")
+ state.Ctx.LogE("sp-process", lesp, err, "")
return nil, err
}
- sdsp["pkt"] = Base32Codec.EncodeToString(freq.Hash[:])
- sdsp["offset"] = freq.Offset
- state.Ctx.LogD("sp-process", sdsp, "queueing")
+ lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(freq.Hash[:])})
+ lesp = append(lesp, LE{"Offset", freq.Offset})
+ state.Ctx.LogD("sp-process", lesp, "queueing")
nice, exists := state.infosOurSeen[*freq.Hash]
if exists {
if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
state.Unlock()
} else {
- state.Ctx.LogD("sp-process", sdsp, "skipping")
+ state.Ctx.LogD("sp-process", lesp, "skipping")
}
} else {
- state.Ctx.LogD("sp-process", sdsp, "unknown")
+ state.Ctx.LogD("sp-process", lesp, "unknown")
}
default:
state.Ctx.LogE(
"sp-process",
- SdsAdd(sds, SDS{"type": head.Type}),
+ append(les, LE{"Type", head.Type}),
errors.New("unknown type"),
"",
)
size += info.Size
}
state.RUnlock()
- state.Ctx.LogI("sp-infos", SDS{
- "xx": string(TRx),
- "node": state.Node.Id,
- "pkts": pkts,
- "size": int64(size),
+ state.Ctx.LogI("sp-infos", LEs{
+ {"XX", string(TRx)},
+ {"Node", state.Node.Id},
+ {"Pkts", pkts},
+ {"Size", int64(size)},
}, "")
}
return payloadsSplit(replies), nil
}
fd, err := TempFile(jobsPath, "")
if err == nil {
- ctx.LogD("tmp", SDS{"src": fd.Name()}, "created")
+ ctx.LogD("tmp", LEs{{"Src", fd.Name()}}, "created")
}
return fd, err
}
return err
}
checksum := Base32Codec.EncodeToString(tmp.Hsh.Sum(nil))
- tmp.ctx.LogD("tmp", SDS{"src": tmp.Fd.Name(), "dst": checksum}, "commit")
+ tmp.ctx.LogD("tmp", LEs{{"Src", tmp.Fd.Name()}, {"Dst", checksum}}, "commit")
if err = os.Rename(tmp.Fd.Name(), filepath.Join(dir, checksum)); err != nil {
return err
}
) bool {
dirLock, err := ctx.LockDir(nodeId, "toss")
if err != nil {
- ctx.LogE("rx", SDS{}, err, "lock")
+ ctx.LogE("rx", LEs{}, err, "lock")
return false
}
defer ctx.UnlockDir(dirLock)
defer decompressor.Close()
for job := range ctx.Jobs(nodeId, TRx) {
pktName := filepath.Base(job.Fd.Name())
- sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName}
+ les := LEs{{"Node", job.PktEnc.Sender}, {"Pkt", pktName}}
if job.PktEnc.Nice > nice {
- ctx.LogD("rx", SdsAdd(sds, SDS{"nice": int(job.PktEnc.Nice)}), "too nice")
+ ctx.LogD("rx", append(les, LE{"Nice", int(job.PktEnc.Nice)}), "too nice")
job.Fd.Close() // #nosec G104
continue
}
var pktSize int64
var pktSizeBlocks int64
if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
- ctx.LogE("rx", sds, err, "unmarshal")
+ ctx.LogE("rx", les, err, "unmarshal")
isBad = true
goto Closing
}
pktSize -= poly1305.TagSize
}
pktSize -= pktSizeBlocks * poly1305.TagSize
- sds["size"] = pktSize
- ctx.LogD("rx", sds, "taken")
+ les = append(les, LE{"Size", pktSize})
+ ctx.LogD("rx", les, "taken")
switch pkt.Type {
case PktTypeExec, PktTypeExecFat:
if noExec {
args = append(args, string(p))
}
argsStr := strings.Join(append([]string{handle}, args...), " ")
- sds := SdsAdd(sds, SDS{
- "type": "exec",
- "dst": argsStr,
- })
+ les = append(les, LEs{
+ {"Type", "exec"},
+ {"Dst", argsStr},
+ }...)
sender := ctx.Neigh[*job.PktEnc.Sender]
cmdline, exists := sender.Exec[handle]
if !exists || len(cmdline) == 0 {
- ctx.LogE("rx", sds, errors.New("No handle found"), "")
+ ctx.LogE("rx", les, errors.New("No handle found"), "")
isBad = true
goto Closing
}
}
output, err := cmd.Output()
if err != nil {
- ctx.LogE("rx", sds, err, "handle")
+ ctx.LogE("rx", les, err, "handle")
isBad = true
goto Closing
}
"Exec from %s: %s", sender.Name, argsStr,
), output)
if err = cmd.Run(); err != nil {
- ctx.LogE("rx", sds, err, "notify")
+ ctx.LogE("rx", les, err, "notify")
}
}
}
}
- ctx.LogI("rx", sds, "")
+ ctx.LogI("rx", les, "")
if !dryRun {
if doSeen {
if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
}
}
if err = os.Remove(job.Fd.Name()); err != nil {
- ctx.LogE("rx", sds, err, "remove")
+ ctx.LogE("rx", les, err, "remove")
isBad = true
}
}
goto Closing
}
dst := string(pkt.Path[:int(pkt.PathLen)])
- sds := SdsAdd(sds, SDS{"type": "file", "dst": dst})
+ les = append(les, LEs{{"Type", "file"}, {"Dst", dst}}...)
if filepath.IsAbs(dst) {
- ctx.LogE("rx", sds, errors.New("non-relative destination path"), "")
+ ctx.LogE("rx", les, errors.New("non-relative destination path"), "")
isBad = true
goto Closing
}
incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
if incoming == nil {
- ctx.LogE("rx", sds, errors.New("incoming is not allowed"), "")
+ ctx.LogE("rx", les, errors.New("incoming is not allowed"), "")
isBad = true
goto Closing
}
dir := filepath.Join(*incoming, path.Dir(dst))
if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
- ctx.LogE("rx", sds, err, "mkdir")
+ ctx.LogE("rx", les, err, "mkdir")
isBad = true
goto Closing
}
if !dryRun {
tmp, err := TempFile(dir, "file")
if err != nil {
- ctx.LogE("rx", sds, err, "mktemp")
+ ctx.LogE("rx", les, err, "mktemp")
isBad = true
goto Closing
}
- sds["tmp"] = tmp.Name()
- ctx.LogD("rx", sds, "created")
+ les = append(les, LE{"Tmp", tmp.Name()})
+ ctx.LogD("rx", les, "created")
bufW := bufio.NewWriter(tmp)
if _, err = CopyProgressed(
bufW, pipeR, "Rx file",
- SdsAdd(sds, SDS{"fullsize": sds["size"]}),
+ append(les, LE{"FullSize", pktSize}),
ctx.ShowPrgrs,
); err != nil {
- ctx.LogE("rx", sds, err, "copy")
+ ctx.LogE("rx", les, err, "copy")
isBad = true
goto Closing
}
if err = bufW.Flush(); err != nil {
tmp.Close() // #nosec G104
- ctx.LogE("rx", sds, err, "copy")
+ ctx.LogE("rx", les, err, "copy")
isBad = true
goto Closing
}
if err = tmp.Sync(); err != nil {
tmp.Close() // #nosec G104
- ctx.LogE("rx", sds, err, "copy")
+ ctx.LogE("rx", les, err, "copy")
isBad = true
goto Closing
}
if err = tmp.Close(); err != nil {
- ctx.LogE("rx", sds, err, "copy")
+ ctx.LogE("rx", les, err, "copy")
isBad = true
goto Closing
}
if os.IsNotExist(err) {
break
}
- ctx.LogE("rx", sds, err, "stat")
+ ctx.LogE("rx", les, err, "stat")
isBad = true
goto Closing
}
dstPathCtr++
}
if err = os.Rename(tmp.Name(), dstPath); err != nil {
- ctx.LogE("rx", sds, err, "rename")
+ ctx.LogE("rx", les, err, "rename")
isBad = true
}
if err = DirSync(*incoming); err != nil {
- ctx.LogE("rx", sds, err, "sync")
+ ctx.LogE("rx", les, err, "sync")
isBad = true
}
- delete(sds, "tmp")
+ les = les[:len(les)-1] // delete Tmp
}
- ctx.LogI("rx", sds, "")
+ ctx.LogI("rx", les, "")
if !dryRun {
if doSeen {
if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
}
}
if err = os.Remove(job.Fd.Name()); err != nil {
- ctx.LogE("rx", sds, err, "remove")
+ ctx.LogE("rx", les, err, "remove")
isBad = true
}
if len(sendmail) > 0 && ctx.NotifyFile != nil {
humanize.IBytes(uint64(pktSize)),
), nil)
if err = cmd.Run(); err != nil {
- ctx.LogE("rx", sds, err, "notify")
+ ctx.LogE("rx", les, err, "notify")
}
}
}
}
src := string(pkt.Path[:int(pkt.PathLen)])
if filepath.IsAbs(src) {
- ctx.LogE("rx", sds, errors.New("non-relative source path"), "")
+ ctx.LogE("rx", les, errors.New("non-relative source path"), "")
isBad = true
goto Closing
}
- sds := SdsAdd(sds, SDS{"type": "freq", "src": src})
+ les := append(les, LEs{{"Type", "freq"}, {"Src", src}}...)
dstRaw, err := ioutil.ReadAll(pipeR)
if err != nil {
- ctx.LogE("rx", sds, err, "read")
+ ctx.LogE("rx", les, err, "read")
isBad = true
goto Closing
}
dst := string(dstRaw)
- sds["dst"] = dst
+ les = append(les, LE{"Dst", dst})
sender := ctx.Neigh[*job.PktEnc.Sender]
freqPath := sender.FreqPath
if freqPath == nil {
- ctx.LogE("rx", sds, errors.New("freqing is not allowed"), "")
+ ctx.LogE("rx", les, errors.New("freqing is not allowed"), "")
isBad = true
goto Closing
}
sender.FreqMaxSize,
)
if err != nil {
- ctx.LogE("rx", sds, err, "tx file")
+ ctx.LogE("rx", les, err, "tx file")
isBad = true
goto Closing
}
}
- ctx.LogI("rx", sds, "")
+ ctx.LogI("rx", les, "")
if !dryRun {
if doSeen {
if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
}
}
if err = os.Remove(job.Fd.Name()); err != nil {
- ctx.LogE("rx", sds, err, "remove")
+ ctx.LogE("rx", les, err, "remove")
isBad = true
}
if len(sendmail) > 0 && ctx.NotifyFreq != nil {
"Freq from %s: %s", sender.Name, src,
), nil)
if err = cmd.Run(); err != nil {
- ctx.LogE("rx", sds, err, "notify")
+ ctx.LogE("rx", les, err, "notify")
}
}
}
copy(dst[:], pkt.Path[:int(pkt.PathLen)])
nodeId := NodeId(*dst)
node, known := ctx.Neigh[nodeId]
- sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId})
+ les := append(les, LEs{{"Type", "trns"}, {"Dst", nodeId}}...)
if !known {
- ctx.LogE("rx", sds, errors.New("unknown node"), "")
+ ctx.LogE("rx", les, errors.New("unknown node"), "")
isBad = true
goto Closing
}
- ctx.LogD("rx", sds, "taken")
+ ctx.LogD("rx", les, "taken")
if !dryRun {
if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
- ctx.LogE("rx", sds, err, "tx trns")
+ ctx.LogE("rx", les, err, "tx trns")
isBad = true
goto Closing
}
}
- ctx.LogI("rx", sds, "")
+ ctx.LogI("rx", les, "")
if !dryRun {
if doSeen {
if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
}
}
if err = os.Remove(job.Fd.Name()); err != nil {
- ctx.LogE("rx", sds, err, "remove")
+ ctx.LogE("rx", les, err, "remove")
isBad = true
}
}
default:
- ctx.LogE("rx", sds, errors.New("unknown type"), "")
+ ctx.LogE("rx", les, errors.New("unknown type"), "")
isBad = true
}
Closing:
curSize := size
pipeR, pipeW := io.Pipe()
go func(size int64, src io.Reader, dst io.WriteCloser) {
- ctx.LogD("tx", SDS{
- "node": hops[0].Id,
- "nice": int(nice),
- "size": size,
+ ctx.LogD("tx", LEs{
+ {"Node", hops[0].Id},
+ {"Nice", int(nice)},
+ {"Size", size},
}, "wrote")
errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst)
dst.Close() // #nosec G104
pipeRPrev = pipeR
pipeR, pipeW = io.Pipe()
go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) {
- ctx.LogD("tx", SDS{
- "node": node.Id,
- "nice": int(nice),
- "size": size,
+ ctx.LogD("tx", LEs{
+ {"Node", node.Id},
+ {"Nice", int(nice)},
+ {"Size", size},
}, "trns wrote")
errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst)
dst.Close() // #nosec G104
go func() {
_, err := CopyProgressed(
tmp.W, pipeR, "Tx",
- SDS{"pkt": pktName, "fullsize": curSize},
+ LEs{{"Pkt", pktName}, {"FullSize", curSize}},
ctx.ShowPrgrs,
)
errs <- err
return err
}
_, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath)
- sds := SDS{
- "type": "file",
- "node": node.Id,
- "nice": int(nice),
- "src": srcPath,
- "dst": dstPath,
- "size": fileSize,
+ les := LEs{
+ {"Type", "file"},
+ {"Node", node.Id},
+ {"Nice", int(nice)},
+ {"Src", srcPath},
+ {"Dst", dstPath},
+ {"Size", fileSize},
}
if err == nil {
- ctx.LogI("tx", sds, "sent")
+ ctx.LogI("tx", les, "sent")
} else {
- ctx.LogE("tx", sds, err, "sent")
+ ctx.LogE("tx", les, err, "sent")
}
return err
}
io.TeeReader(reader, hsh),
path,
)
- sds := SDS{
- "type": "file",
- "node": node.Id,
- "nice": int(nice),
- "src": srcPath,
- "dst": path,
- "size": sizeToSend,
+ les := LEs{
+ {"Type", "file"},
+ {"Node", node.Id},
+ {"Nice", int(nice)},
+ {"Src", srcPath},
+ {"Dst", path},
+ {"Size", sizeToSend},
}
if err == nil {
- ctx.LogI("tx", sds, "sent")
+ ctx.LogI("tx", les, "sent")
} else {
- ctx.LogE("tx", sds, err, "sent")
+ ctx.LogE("tx", les, err, "sent")
return err
}
hsh.Sum(metaPkt.Checksums[chunkNum][:0])
}
metaPktSize := int64(metaBuf.Len())
_, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path)
- sds := SDS{
- "type": "file",
- "node": node.Id,
- "nice": int(nice),
- "src": srcPath,
- "dst": path,
- "size": metaPktSize,
+ les := LEs{
+ {"Type", "file"},
+ {"Node", node.Id},
+ {"Nice", int(nice)},
+ {"Src", srcPath},
+ {"Dst", path},
+ {"Size", metaPktSize},
}
if err == nil {
- ctx.LogI("tx", sds, "sent")
+ ctx.LogI("tx", les, "sent")
} else {
- ctx.LogE("tx", sds, err, "sent")
+ ctx.LogE("tx", les, err, "sent")
}
return err
}
src := strings.NewReader(dstPath)
size := int64(src.Len())
_, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath)
- sds := SDS{
- "type": "freq",
- "node": node.Id,
- "nice": int(nice),
- "replynice": int(replyNice),
- "src": srcPath,
- "dst": dstPath,
+ les := LEs{
+ {"Type", "freq"},
+ {"Node", node.Id},
+ {"Nice", int(nice)},
+ {"ReplyNice", int(replyNice)},
+ {"Src", srcPath},
+ {"Dst", dstPath},
}
if err == nil {
- ctx.LogI("tx", sds, "sent")
+ ctx.LogI("tx", les, "sent")
} else {
- ctx.LogE("tx", sds, err, "sent")
+ ctx.LogE("tx", les, err, "sent")
}
return err
}
_, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle)
}
- sds := SDS{
- "type": "exec",
- "node": node.Id,
- "nice": int(nice),
- "replynice": int(replyNice),
- "dst": strings.Join(append([]string{handle}, args...), " "),
- "size": size,
+ les := LEs{
+ {"Type", "exec"},
+ {"Node", node.Id},
+ {"Nice", int(nice)},
+ {"ReplyNice", int(replyNice)},
+ {"Dst", strings.Join(append([]string{handle}, args...), " ")},
+ {"Size", size},
}
if err == nil {
- ctx.LogI("tx", sds, "sent")
+ ctx.LogI("tx", les, "sent")
} else {
- ctx.LogE("tx", sds, err, "sent")
+ ctx.LogE("tx", les, err, "sent")
}
return err
}
func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {
- sds := SDS{
- "type": "trns",
- "node": node.Id,
- "nice": int(nice),
- "size": size,
+ les := LEs{
+ {"Type", "trns"},
+ {"Node", node.Id},
+ {"Nice", int(nice)},
+ {"Size", size},
}
- ctx.LogD("tx", sds, "taken")
+ ctx.LogD("tx", les, "taken")
if !ctx.IsEnoughSpace(size) {
err := errors.New("is not enough space")
- ctx.LogE("tx", sds, err, err.Error())
+ ctx.LogE("tx", les, err, err.Error())
return err
}
tmp, err := ctx.NewTmpFileWHash()
}
if _, err = CopyProgressed(
tmp.W, src, "Tx trns",
- SDS{"pkt": node.Id.String(), "fullsize": size},
+ LEs{{"Pkt", node.Id.String()}, {"FullSize", size}},
ctx.ShowPrgrs,
); err != nil {
return err
nodePath := filepath.Join(ctx.Spool, node.Id.String())
err = tmp.Commit(filepath.Join(nodePath, string(TTx)))
if err == nil {
- ctx.LogI("tx", sds, "sent")
+ ctx.LogI("tx", les, "sent")
} else {
- ctx.LogI("tx", SdsAdd(sds, SDS{"err": err}), "sent")
+ ctx.LogI("tx", append(les, LE{"Err", err}), "sent")
}
os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) // #nosec G104
return err