X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Fsp.go;h=e6e4900a47052ceb81098d3e8d0f453889c48e7a;hb=4a8e79c63ceea6596ccfe3d173ad452389ded81e;hp=94b37ff624893f8560a7d4b0f1f0f3ed827c4eff;hpb=4e08a1c97600e0372680e86a651f916c70e89342;p=nncp.git diff --git a/src/sp.go b/src/sp.go index 94b37ff..e6e4900 100644 --- a/src/sp.go +++ b/src/sp.go @@ -1,6 +1,6 @@ /* NNCP -- Node to Node copy, utilities for store-and-forward data exchange -Copyright (C) 2016-2020 Sergey Matveev +Copyright (C) 2016-2021 Sergey Matveev This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -21,6 +21,8 @@ import ( "bytes" "crypto/subtle" "errors" + "fmt" + "hash" "io" "os" "path/filepath" @@ -29,7 +31,9 @@ import ( "time" xdr "github.com/davecgh/go-xdr/xdr2" + "github.com/dustin/go-humanize" "github.com/flynn/noise" + "golang.org/x/crypto/blake2b" ) const ( @@ -38,6 +42,11 @@ const ( SPHeadOverhead = 4 ) +type SPCheckerQueues struct { + appeared chan *[32]byte + checked chan *[32]byte +} + var ( MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1} @@ -56,9 +65,19 @@ var ( DefaultDeadline = 10 * time.Second PingTimeout = time.Minute - spWorkersGroup sync.WaitGroup + spCheckers = make(map[NodeId]*SPCheckerQueues) ) +type FdAndFullSize struct { + fd *os.File + fullSize int64 +} + +type HasherAndOffset struct { + h hash.Hash + offset uint64 +} + type SPType uint8 const ( @@ -181,6 +200,7 @@ type SPState struct { Ctx *Ctx Node *Node Nice uint8 + NoCK bool onlineDeadline time.Duration maxOnlineTime time.Duration hs *noise.HandshakeState @@ -212,6 +232,9 @@ type SPState struct { listOnly bool onlyPkts map[[32]byte]bool writeSPBuf bytes.Buffer + fds map[string]FdAndFullSize + fileHashers map[string]*HasherAndOffset + checkerQueues SPCheckerQueues sync.RWMutex } @@ -226,11 +249,16 @@ func (state *SPState) SetDead() { } close(state.isDead) go func() { - for _ = range state.payloads { + for range state.payloads { } }() go func() { - for _ = range state.pings { + for range state.pings { + } + }() + go func() { + for _, s := range state.fds { + s.fd.Close() } }() } @@ -249,6 +277,38 @@ func (state *SPState) dirUnlock() { state.Ctx.UnlockDir(state.txLock) } +func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) { + for hshValue := range appeared { + pktName := Base32Codec.EncodeToString(hshValue[:]) + les := LEs{ + {"XX", string(TRx)}, + {"Node", nodeId}, + {"Pkt", pktName}, + } + ctx.LogD("sp-checker", les, func(les LEs) string { + return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(nodeId), pktName) + }) + size, err := ctx.CheckNoCK(nodeId, hshValue) + les = append(les, LE{"Size", size}) + if err != nil { + ctx.LogE("sp-checker", les, err, func(les LEs) string { + return fmt.Sprintf( + "Checksumming %s/rx/%s (%s)", ctx.NodeName(nodeId), pktName, + humanize.IBytes(uint64(size)), + ) + }) + continue + } + ctx.LogI("sp-checker-done", les, func(les LEs) string { + return fmt.Sprintf( + "Packet %s is retreived (%s)", + pktName, humanize.IBytes(uint64(size)), + ) + }) + go func(hsh *[32]byte) { checked <- hsh }(hshValue) + } +} + func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error { state.writeSPBuf.Reset() n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{ @@ -290,7 +350,6 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [ var infos []*SPInfo var totalSize int64 for job := range ctx.Jobs(nodeId, TTx) { - job.Fd.Close() // #nosec G104 if job.PktEnc.Nice > nice { continue } @@ -309,19 +368,34 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [ var payloads [][]byte for _, info := range infos { payloads = append(payloads, MarshalSP(SPTypeInfo, info)) - ctx.LogD("sp-info-our", SDS{ - "node": nodeId, - "name": Base32Codec.EncodeToString(info.Hash[:]), - "size": info.Size, - }, "") + pktName := Base32Codec.EncodeToString(info.Hash[:]) + ctx.LogD("sp-info-our", LEs{ + {"Node", nodeId}, + {"Name", pktName}, + {"Size", info.Size}, + }, func(les LEs) string { + return fmt.Sprintf( + "Our info: %s/tx/%s (%s)", + ctx.NodeName(nodeId), + pktName, + humanize.IBytes(info.Size), + ) + }) } if totalSize > 0 { - ctx.LogI("sp-infos", SDS{ - "xx": string(TTx), - "node": nodeId, - "pkts": len(payloads), - "size": totalSize, - }, "") + ctx.LogI("sp-infos-tx", LEs{ + {"XX", string(TTx)}, + {"Node", nodeId}, + {"Pkts", len(payloads)}, + {"Size", totalSize}, + }, func(les LEs) string { + return fmt.Sprintf( + "We have got for %s: %d packets, %s", + ctx.NodeName(nodeId), + len(payloads), + humanize.IBytes(uint64(totalSize)), + ) + }) } return payloadsSplit(payloads) } @@ -390,31 +464,73 @@ func (state *SPState) StartI(conn ConnDeadlined) error { state.dirUnlock() return err } - sds := SDS{"node": nodeId, "nice": int(state.Nice)} - state.Ctx.LogD("sp-start", sds, "sending first message") + les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}} + state.Ctx.LogD("sp-startI", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): sending first message", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if err = state.WriteSP(conn, buf, false); err != nil { - state.Ctx.LogE("sp-start", sds, err, "") + state.Ctx.LogE("sp-startI", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): writing", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) state.dirUnlock() return err } - state.Ctx.LogD("sp-start", sds, "waiting for first message") + state.Ctx.LogD("sp-startI-wait", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): waiting for first message", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if buf, err = state.ReadSP(conn); err != nil { - state.Ctx.LogE("sp-start", sds, err, "") + state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): reading", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) state.dirUnlock() return err } payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf) if err != nil { - state.Ctx.LogE("sp-start", sds, err, "") + state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): reading Noise message", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) state.dirUnlock() return err } - state.Ctx.LogD("sp-start", sds, "starting workers") + state.Ctx.LogD("sp-startI-workers", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): starting workers", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) err = state.StartWorkers(conn, infosPayloads, payload) if err != nil { - state.Ctx.LogE("sp-start", sds, err, "") + state.Ctx.LogE("sp-startI-workers", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): starting workers", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) state.dirUnlock() } return err @@ -443,16 +559,24 @@ func (state *SPState) StartR(conn ConnDeadlined) error { state.infosTheir = make(map[[32]byte]*SPInfo) state.started = started state.xxOnly = xxOnly + var buf []byte var payload []byte - state.Ctx.LogD("sp-start", SDS{"nice": int(state.Nice)}, "waiting for first message") + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP nice %s: waiting for first message", + NicenessFmt(state.Nice), + ) + } + les := LEs{{"Nice", int(state.Nice)}} + state.Ctx.LogD("sp-startR", les, logMsg) conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if buf, err = state.ReadSP(conn); err != nil { - state.Ctx.LogE("sp-start", SDS{}, err, "") + state.Ctx.LogE("sp-startR-read", les, err, logMsg) return err } if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil { - state.Ctx.LogE("sp-start", SDS{}, err, "") + state.Ctx.LogE("sp-startR-read", les, err, logMsg) return err } @@ -465,15 +589,16 @@ func (state *SPState) StartR(conn ConnDeadlined) error { } if node == nil { peerId := Base32Codec.EncodeToString(state.hs.PeerStatic()) - state.Ctx.LogE("sp-start", SDS{"peer": peerId}, errors.New("unknown"), "") - return errors.New("Unknown peer: " + peerId) + err = errors.New("unknown peer: " + peerId) + state.Ctx.LogE("sp-startR-unknown", append(les, LE{"Peer", peerId}), err, logMsg) + return err } state.Node = node state.rxRate = node.RxRate state.txRate = node.TxRate state.onlineDeadline = node.OnlineDeadline state.maxOnlineTime = node.MaxOnlineTime - sds := SDS{"node": node.Id, "nice": int(state.Nice)} + les = LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}} if err = state.Ctx.ensureRxDir(node.Id); err != nil { return err @@ -508,7 +633,12 @@ func (state *SPState) StartR(conn ConnDeadlined) error { firstPayload = append(firstPayload, SPHaltMarshalized...) } - state.Ctx.LogD("sp-start", sds, "sending first message") + state.Ctx.LogD("sp-startR-write", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): sending first message", + node.Name, NicenessFmt(state.Nice), + ) + }) buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload) if err != nil { state.dirUnlock() @@ -516,11 +646,21 @@ func (state *SPState) StartR(conn ConnDeadlined) error { } conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if err = state.WriteSP(conn, buf, false); err != nil { - state.Ctx.LogE("sp-start", sds, err, "") + state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): writing", + node.Name, NicenessFmt(state.Nice), + ) + }) state.dirUnlock() return err } - state.Ctx.LogD("sp-start", sds, "starting workers") + state.Ctx.LogD("sp-startR-workers", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): starting workers", + node.Name, NicenessFmt(state.Nice), + ) + }) err = state.StartWorkers(conn, infosPayloads, payload) if err != nil { state.dirUnlock() @@ -528,26 +668,75 @@ func (state *SPState) StartR(conn ConnDeadlined) error { return err } +func (state *SPState) closeFd(pth string) { + s, exists := state.fds[pth] + delete(state.fds, pth) + if exists { + s.fd.Close() + } +} + func (state *SPState) StartWorkers( conn ConnDeadlined, infosPayloads [][]byte, payload []byte, ) error { - sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)} + les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}} + state.fds = make(map[string]FdAndFullSize) + state.fileHashers = make(map[string]*HasherAndOffset) state.isDead = make(chan struct{}) if state.maxOnlineTime > 0 { state.mustFinishAt = state.started.Add(state.maxOnlineTime) } + // Checker + if !state.NoCK { + queues := spCheckers[*state.Node.Id] + if queues == nil { + queues = &SPCheckerQueues{ + appeared: make(chan *[32]byte), + checked: make(chan *[32]byte), + } + spCheckers[*state.Node.Id] = queues + go SPChecker(state.Ctx, state.Node.Id, queues.appeared, queues.checked) + } + state.checkerQueues = *queues + go func() { + for job := range state.Ctx.JobsNoCK(state.Node.Id) { + if job.PktEnc.Nice <= state.Nice { + state.checkerQueues.appeared <- job.HshValue + } + } + }() + state.wg.Add(1) + go func() { + defer state.wg.Done() + for { + select { + case <-state.isDead: + return + case hsh := <-state.checkerQueues.checked: + state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh}) + } + } + }() + } + // Remaining handshake payload sending if len(infosPayloads) > 1 { state.wg.Add(1) go func() { for _, payload := range infosPayloads[1:] { state.Ctx.LogD( - "sp-work", - SdsAdd(sds, SDS{"size": len(payload)}), - "queuing remaining payload", + "sp-queue-remaining", + append(les, LE{"Size", int64(len(payload))}), + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): queuing remaining payload (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + }, ) state.payloads <- payload } @@ -556,23 +745,32 @@ func (state *SPState) StartWorkers( } // Processing of first payload and queueing its responses - state.Ctx.LogD( - "sp-work", - SdsAdd(sds, SDS{"size": len(payload)}), - "processing first payload", - ) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): processing first payload (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + } + state.Ctx.LogD("sp-process", append(les, LE{"Size", int64(len(payload))}), logMsg) replies, err := state.ProcessSP(payload) if err != nil { - state.Ctx.LogE("sp-work", sds, err, "") + state.Ctx.LogE("sp-process", les, err, logMsg) return err } state.wg.Add(1) go func() { for _, reply := range replies { state.Ctx.LogD( - "sp-work", - SdsAdd(sds, SDS{"size": len(reply)}), - "queuing reply", + "sp-queue-reply", + append(les, LE{"Size", int64(len(reply))}), + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): queuing reply (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + }, ) state.payloads <- reply } @@ -629,9 +827,15 @@ func (state *SPState) StartWorkers( &state.infosOurSeen, ) { state.Ctx.LogD( - "sp-work", - SdsAdd(sds, SDS{"size": len(payload)}), - "queuing new info", + "sp-queue-info", + append(les, LE{"Size", int64(len(payload))}), + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): queuing new info (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + }, ) state.payloads <- payload } @@ -654,14 +858,25 @@ func (state *SPState) StartWorkers( var ping bool select { case <-state.pings: - state.Ctx.LogD("sp-xmit", sds, "got ping") + state.Ctx.LogD("sp-got-ping", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): got ping", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) payload = SPPingMarshalized ping = true case payload = <-state.payloads: state.Ctx.LogD( - "sp-xmit", - SdsAdd(sds, SDS{"size": len(payload)}), - "got payload", + "sp-got-payload", + append(les, LE{"Size", int64(len(payload))}), + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): got payload (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + }, ) default: state.RLock() @@ -675,60 +890,107 @@ func (state *SPState) StartWorkers( if state.txRate > 0 { time.Sleep(time.Second / time.Duration(state.txRate)) } - sdsp := SdsAdd(sds, SDS{ - "xx": string(TTx), - "pkt": Base32Codec.EncodeToString(freq.Hash[:]), - "size": int64(freq.Offset), + pktName := Base32Codec.EncodeToString(freq.Hash[:]) + lesp := append( + les, + LE{"XX", string(TTx)}, + LE{"Pkt", pktName}, + LE{"Size", int64(freq.Offset)}, + ) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): tx/%s (%s)", + state.Node.Name, NicenessFmt(state.Nice), + pktName, + humanize.IBytes(freq.Offset), + ) + } + state.Ctx.LogD("sp-queue", lesp, func(les LEs) string { + return logMsg(les) + ": queueing" }) - state.Ctx.LogD("sp-file", sdsp, "queueing") - fd, err := os.Open(filepath.Join( + pth := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TTx), Base32Codec.EncodeToString(freq.Hash[:]), - )) - if err != nil { - state.Ctx.LogE("sp-file", sdsp, err, "") - return - } - fi, err := fd.Stat() - if err != nil { - state.Ctx.LogE("sp-file", sdsp, err, "") - return + ) + fdAndFullSize, exists := state.fds[pth] + if !exists { + fd, err := os.Open(pth) + if err != nil { + state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string { + return logMsg(les) + ": opening" + }) + return + } + fi, err := fd.Stat() + if err != nil { + state.Ctx.LogE("sp-queue-stat", lesp, err, func(les LEs) string { + return logMsg(les) + ": stating" + }) + return + } + fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()} + state.fds[pth] = fdAndFullSize } - fullSize := fi.Size() + fd := fdAndFullSize.fd + fullSize := fdAndFullSize.fullSize var buf []byte if freq.Offset < uint64(fullSize) { - state.Ctx.LogD("sp-file", sdsp, "seeking") + state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string { + return logMsg(les) + ": seeking" + }) if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil { - state.Ctx.LogE("sp-file", sdsp, err, "") + state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string { + return logMsg(les) + ": seeking" + }) return } buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead) n, err := fd.Read(buf) if err != nil { - state.Ctx.LogE("sp-file", sdsp, err, "") + state.Ctx.LogE("sp-file-read", lesp, err, func(les LEs) string { + return logMsg(les) + ": reading" + }) return } buf = buf[:n] - state.Ctx.LogD("sp-file", SdsAdd(sdsp, SDS{"size": n}), "read") + lesp = append( + les, + LE{"XX", string(TTx)}, + LE{"Pkt", pktName}, + LE{"Size", int64(n)}, + ) + state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string { + return fmt.Sprintf( + "%s: read %s", + logMsg(les), humanize.IBytes(uint64(n)), + ) + }) } - fd.Close() // #nosec G104 + state.closeFd(pth) payload = MarshalSP(SPTypeFile, SPFile{ Hash: freq.Hash, Offset: freq.Offset, Payload: buf, }) ourSize := freq.Offset + uint64(len(buf)) - sdsp["size"] = int64(ourSize) - sdsp["fullsize"] = fullSize + lesp = append( + les, + LE{"XX", string(TTx)}, + LE{"Pkt", pktName}, + LE{"Size", int64(ourSize)}, + LE{"FullSize", fullSize}, + ) if state.Ctx.ShowPrgrs { - Progress("Tx", sdsp) + Progress("Tx", lesp) } state.Lock() if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash { if ourSize == uint64(fullSize) { - state.Ctx.LogD("sp-file", sdsp, "finished") + state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string { + return logMsg(les) + ": finished" + }) if len(state.queueTheir) > 1 { state.queueTheir = state.queueTheir[1:] } else { @@ -738,14 +1000,23 @@ func (state *SPState) StartWorkers( state.queueTheir[0].freq.Offset += uint64(len(buf)) } } else { - state.Ctx.LogD("sp-file", sdsp, "queue disappeared") + state.Ctx.LogD("sp-file-disappeared", lesp, func(les LEs) string { + return logMsg(les) + ": queue disappeared" + }) } state.Unlock() } - state.Ctx.LogD("sp-xmit", SdsAdd(sds, SDS{"size": len(payload)}), "sending") + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): sending %s", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + } + state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg) conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil { - state.Ctx.LogE("sp-xmit", sds, err, "") + state.Ctx.LogE("sp-sending", les, err, logMsg) return } } @@ -758,7 +1029,13 @@ func (state *SPState) StartWorkers( if state.NotAlive() { break } - state.Ctx.LogD("sp-recv", sds, "waiting for payload") + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): waiting for payload", + state.Node.Name, NicenessFmt(state.Nice), + ) + } + state.Ctx.LogD("sp-recv-wait", les, logMsg) conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 payload, err := state.ReadSP(conn) if err != nil { @@ -772,36 +1049,55 @@ func (state *SPState) StartWorkers( if unmarshalErr.ErrorCode == xdr.ErrIO { break } - state.Ctx.LogE("sp-recv", sds, err, "") + state.Ctx.LogE("sp-recv-wait", les, err, logMsg) break } + logMsg = func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): payload (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + } state.Ctx.LogD( - "sp-recv", - SdsAdd(sds, SDS{"size": len(payload)}), - "got payload", + "sp-recv-got", + append(les, LE{"Size", int64(len(payload))}), + func(les LEs) string { return logMsg(les) + ": got" }, ) payload, err = state.csTheir.Decrypt(nil, nil, payload) if err != nil { - state.Ctx.LogE("sp-recv", sds, err, "") + state.Ctx.LogE("sp-recv-got", les, err, func(les LEs) string { + return logMsg(les) + ": got" + }) break } state.Ctx.LogD( - "sp-recv", - SdsAdd(sds, SDS{"size": len(payload)}), - "processing", + "sp-recv-process", + append(les, LE{"Size", int64(len(payload))}), + func(les LEs) string { + return logMsg(les) + ": processing" + }, ) replies, err := state.ProcessSP(payload) if err != nil { - state.Ctx.LogE("sp-recv", sds, err, "") + state.Ctx.LogE("sp-recv-process", les, err, func(les LEs) string { + return logMsg(les) + ": processing" + }) break } state.wg.Add(1) go func() { for _, reply := range replies { state.Ctx.LogD( - "sp-recv", - SdsAdd(sds, SDS{"size": len(reply)}), - "queuing reply", + "sp-recv-reply", + append(les[:len(les)-1], LE{"Size", int64(len(reply))}), + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): queuing reply (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(reply))), + ) + }, ) state.payloads <- reply } @@ -839,16 +1135,26 @@ func (state *SPState) Wait() { } func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { - sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)} + les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}} r := bytes.NewReader(payload) var err error var replies [][]byte var infosGot bool for r.Len() > 0 { - state.Ctx.LogD("sp-process", sds, "unmarshaling header") + state.Ctx.LogD("sp-process-unmarshal", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling header", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) var head SPHead if _, err = xdr.Unmarshal(r, &head); err != nil { - state.Ctx.LogE("sp-process", sds, err, "") + state.Ctx.LogE("sp-process-unmarshal", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling header", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) return nil, err } if head.Type != SPTypePing { @@ -856,71 +1162,147 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } switch head.Type { case SPTypeHalt: - state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "halt"}), "") + state.Ctx.LogD( + "sp-process-halt", + append(les, LE{"Type", "halt"}), func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): got HALT", + state.Node.Name, NicenessFmt(state.Nice), + ) + }, + ) state.Lock() state.queueTheir = nil state.Unlock() + case SPTypePing: - state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "ping"}), "") + state.Ctx.LogD( + "sp-process-ping", + append(les, LE{"Type", "ping"}), + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): got PING", + state.Node.Name, NicenessFmt(state.Nice), + ) + }, + ) + case SPTypeInfo: infosGot = true - sdsp := SdsAdd(sds, SDS{"type": "info"}) - state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet") + lesp := append(les, LE{"Type", "info"}) + state.Ctx.LogD( + "sp-process-info-unmarshal", lesp, + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling INFO", + state.Node.Name, NicenessFmt(state.Nice), + ) + }, + ) var info SPInfo if _, err = xdr.Unmarshal(r, &info); err != nil { - state.Ctx.LogE("sp-process", sdsp, err, "") + state.Ctx.LogE( + "sp-process-info-unmarshal", lesp, err, + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling INFO", + state.Node.Name, NicenessFmt(state.Nice), + ) + }, + ) return nil, err } - sdsp = SdsAdd(sds, SDS{ - "pkt": Base32Codec.EncodeToString(info.Hash[:]), - "size": int64(info.Size), - "nice": int(info.Nice), - }) + pktName := Base32Codec.EncodeToString(info.Hash[:]) + lesp = append( + lesp, + LE{"Pkt", pktName}, + LE{"Size", int64(info.Size)}, + LE{"PktNice", int(info.Nice)}, + ) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): INFO %s (%s) nice %s", + state.Node.Name, NicenessFmt(state.Nice), + pktName, + humanize.IBytes(info.Size), + NicenessFmt(info.Nice), + ) + } if !state.listOnly && info.Nice > state.Nice { - state.Ctx.LogD("sp-process", sdsp, "too nice") + state.Ctx.LogD("sp-process-info-too-nice", lesp, func(les LEs) string { + return logMsg(les) + ": too nice" + }) continue } - state.Ctx.LogD("sp-process", sdsp, "received") + state.Ctx.LogD("sp-process-info-got", lesp, func(les LEs) string { + return logMsg(les) + ": received" + }) if !state.listOnly && state.xxOnly == TTx { continue } state.Lock() state.infosTheir[*info.Hash] = &info state.Unlock() - state.Ctx.LogD("sp-process", sdsp, "stating part") + state.Ctx.LogD("sp-process-info-stat", lesp, func(les LEs) string { + return logMsg(les) + ": stating part" + }) pktPath := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TRx), Base32Codec.EncodeToString(info.Hash[:]), ) + logMsg = func(les LEs) string { + return fmt.Sprintf( + "Packet %s (%s) (nice %s)", + pktName, + humanize.IBytes(info.Size), + NicenessFmt(info.Nice), + ) + } if _, err = os.Stat(pktPath); err == nil { - state.Ctx.LogI("sp-info", sdsp, "already done") + state.Ctx.LogI("sp-info-done", lesp, func(les LEs) string { + return logMsg(les) + ": already done" + }) if !state.listOnly { replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash})) } continue } if _, err = os.Stat(pktPath + SeenSuffix); err == nil { - state.Ctx.LogI("sp-info", sdsp, "already seen") + state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string { + return logMsg(les) + ": already seen" + }) if !state.listOnly { replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash})) } continue } + if _, err = os.Stat(pktPath + NoCKSuffix); err == nil { + state.Ctx.LogI("sp-info-nock", lesp, func(les LEs) string { + return logMsg(les) + ": still not checksummed" + }) + continue + } fi, err := os.Stat(pktPath + PartSuffix) var offset int64 if err == nil { offset = fi.Size() } if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) { - state.Ctx.LogI("sp-info", sdsp, "not enough space") + state.Ctx.LogI("sp-info-no-space", lesp, func(les LEs) string { + return logMsg(les) + ": not enough space" + }) continue } state.Ctx.LogI( "sp-info", - SdsAdd(sdsp, SDS{"offset": offset}), - "", + append(lesp, LE{"Offset", offset}), + func(les LEs) string { + return fmt.Sprintf( + "%s: %d%%", logMsg(les), 100*uint64(offset)/info.Size, + ) + }, ) if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) { replies = append(replies, MarshalSP( @@ -928,52 +1310,116 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { SPFreq{info.Hash, uint64(offset)}, )) } + case SPTypeFile: - sdsp := SdsAdd(sds, SDS{"type": "file"}) - state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet") + lesp := append(les, LE{"Type", "file"}) + state.Ctx.LogD("sp-process-file", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling FILE", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) var file SPFile if _, err = xdr.Unmarshal(r, &file); err != nil { - state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "file"}), err, "") + state.Ctx.LogE("sp-process-file", lesp, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling FILE", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) return nil, err } - sdsp["xx"] = string(TRx) - sdsp["pkt"] = Base32Codec.EncodeToString(file.Hash[:]) - sdsp["size"] = len(file.Payload) + pktName := Base32Codec.EncodeToString(file.Hash[:]) + lesp = append( + lesp, + LE{"XX", string(TRx)}, + LE{"Pkt", pktName}, + LE{"Size", int64(len(file.Payload))}, + ) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "Got packet %s (%s)", + pktName, humanize.IBytes(uint64(len(file.Payload))), + ) + } dirToSync := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TRx), ) - filePath := filepath.Join(dirToSync, Base32Codec.EncodeToString(file.Hash[:])) - state.Ctx.LogD("sp-file", sdsp, "opening part") - fd, err := os.OpenFile( - filePath+PartSuffix, - os.O_RDWR|os.O_CREATE, - os.FileMode(0666), - ) - if err != nil { - state.Ctx.LogE("sp-file", sdsp, err, "") - return nil, err + filePath := filepath.Join(dirToSync, pktName) + filePathPart := filePath + PartSuffix + state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string { + return logMsg(les) + ": opening part" + }) + fdAndFullSize, exists := state.fds[filePathPart] + var fd *os.File + if exists { + fd = fdAndFullSize.fd + } else { + fd, err = os.OpenFile( + filePathPart, + os.O_RDWR|os.O_CREATE, + os.FileMode(0666), + ) + if err != nil { + state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string { + return logMsg(les) + ": opening part" + }) + return nil, err + } + state.fds[filePathPart] = FdAndFullSize{fd: fd} + if file.Offset == 0 { + h, err := blake2b.New256(nil) + if err != nil { + panic(err) + } + state.fileHashers[filePath] = &HasherAndOffset{h: h} + } } state.Ctx.LogD( - "sp-file", - SdsAdd(sdsp, SDS{"offset": file.Offset}), - "seeking", - ) + "sp-file-seek", + append(lesp, LE{"Offset", file.Offset}), + func(les LEs) string { + return fmt.Sprintf("%s: seeking %d", logMsg(les), file.Offset) + }) if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil { - state.Ctx.LogE("sp-file", sdsp, err, "") - fd.Close() // #nosec G104 + state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string { + return logMsg(les) + ": seeking" + }) + state.closeFd(filePathPart) return nil, err } - state.Ctx.LogD("sp-file", sdsp, "writing") - _, err = fd.Write(file.Payload) - if err != nil { - state.Ctx.LogE("sp-file", sdsp, err, "") - fd.Close() // #nosec G104 + state.Ctx.LogD("sp-file-write", lesp, func(les LEs) string { + return logMsg(les) + ": writing" + }) + if _, err = fd.Write(file.Payload); err != nil { + state.Ctx.LogE("sp-file-write", lesp, err, func(les LEs) string { + return logMsg(les) + ": writing" + }) + state.closeFd(filePathPart) return nil, err } + hasherAndOffset, hasherExists := state.fileHashers[filePath] + if hasherExists { + if hasherAndOffset.offset == file.Offset { + if _, err = hasherAndOffset.h.Write(file.Payload); err != nil { + panic(err) + } + hasherAndOffset.offset += uint64(len(file.Payload)) + } else { + state.Ctx.LogD( + "sp-file-offset-differs", lesp, + func(les LEs) string { + return logMsg(les) + ": offset differs, deleting hasher" + }, + ) + delete(state.fileHashers, filePath) + hasherExists = false + } + } ourSize := int64(file.Offset + uint64(len(file.Payload))) - sdsp["size"] = ourSize + lesp[len(lesp)-1].V = ourSize fullsize := int64(0) state.RLock() infoTheir, ok := state.infosTheir[*file.Hash] @@ -981,88 +1427,176 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { if ok { fullsize = int64(infoTheir.Size) } - sdsp["fullsize"] = fullsize + lesp = append(lesp, LE{"FullSize", fullsize}) if state.Ctx.ShowPrgrs { - Progress("Rx", sdsp) + Progress("Rx", lesp) } if fullsize != ourSize { - fd.Close() // #nosec G104 continue } - spWorkersGroup.Wait() - spWorkersGroup.Add(1) - go func() { - if err := fd.Sync(); err != nil { - state.Ctx.LogE("sp-file", sdsp, err, "sync") - fd.Close() // #nosec G104 - return - } - state.wg.Add(1) - defer state.wg.Done() - if _, err = fd.Seek(0, io.SeekStart); err != nil { - fd.Close() // #nosec G104 - state.Ctx.LogE("sp-file", sdsp, err, "") - return - } - state.Ctx.LogD("sp-file", sdsp, "checking") - gut, err := Check(fd, file.Hash[:], sdsp, state.Ctx.ShowPrgrs) - fd.Close() // #nosec G104 - if err != nil || !gut { - state.Ctx.LogE("sp-file", sdsp, errors.New("checksum mismatch"), "") - return + logMsg = func(les LEs) string { + return fmt.Sprintf( + "Got packet %s %d%% (%s / %s)", + pktName, 100*ourSize/fullsize, + humanize.IBytes(uint64(ourSize)), + humanize.IBytes(uint64(fullsize)), + ) + } + err = fd.Sync() + if err != nil { + state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string { + return logMsg(les) + ": syncing" + }) + state.closeFd(filePathPart) + continue + } + if hasherExists { + if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 { + state.Ctx.LogE( + "sp-file-bad-checksum", lesp, + errors.New("checksum mismatch"), + logMsg, + ) + continue } - state.Ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "") - if err = os.Rename(filePath+PartSuffix, filePath); err != nil { - state.Ctx.LogE("sp-file", sdsp, err, "rename") - return + if err = os.Rename(filePathPart, filePath); err != nil { + state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string { + return logMsg(les) + ": renaming" + }) + continue } if err = DirSync(dirToSync); err != nil { - state.Ctx.LogE("sp-file", sdsp, err, "sync") - return + state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string { + return logMsg(les) + ": dirsyncing" + }) + continue } - state.Lock() - delete(state.infosTheir, *file.Hash) - state.Unlock() - spWorkersGroup.Done() + state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string { + return logMsg(les) + ": done" + }) state.wg.Add(1) go func() { state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash}) state.wg.Done() }() - }() + state.Lock() + delete(state.infosTheir, *file.Hash) + state.Unlock() + if !state.Ctx.HdrUsage { + state.closeFd(filePathPart) + continue + } + if _, err = fd.Seek(0, io.SeekStart); err != nil { + state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string { + return logMsg(les) + ": seeking" + }) + state.closeFd(filePathPart) + continue + } + _, pktEncRaw, err := state.Ctx.HdrRead(fd) + state.closeFd(filePathPart) + if err != nil { + state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string { + return logMsg(les) + ": HdrReading" + }) + continue + } + state.Ctx.HdrWrite(pktEncRaw, filePath) + continue + } + state.closeFd(filePathPart) + if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil { + state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string { + return logMsg(les) + ": renaming" + }) + continue + } + if err = DirSync(dirToSync); err != nil { + state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string { + return logMsg(les) + ": dirsyncing" + }) + continue + } + state.Ctx.LogI("sp-file-downloaded", lesp, func(les LEs) string { + return logMsg(les) + ": downloaded" + }) + state.Lock() + delete(state.infosTheir, *file.Hash) + state.Unlock() + if !state.NoCK { + state.checkerQueues.appeared <- file.Hash + } + case SPTypeDone: - sdsp := SdsAdd(sds, SDS{"type": "done"}) - state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet") + lesp := append(les, LE{"Type", "done"}) + state.Ctx.LogD("sp-process-done-unmarshal", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling DONE", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) var done SPDone if _, err = xdr.Unmarshal(r, &done); err != nil { - state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "done"}), err, "") + state.Ctx.LogE("sp-process-done-unmarshal", lesp, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling DONE", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) return nil, err } - sdsp["pkt"] = Base32Codec.EncodeToString(done.Hash[:]) - state.Ctx.LogD("sp-done", sdsp, "removing") - err := os.Remove(filepath.Join( + pktName := Base32Codec.EncodeToString(done.Hash[:]) + lesp = append(lesp, LE{"Pkt", pktName}, LE{"XX", string(TTx)}) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): DONE: removing %s", + state.Node.Name, NicenessFmt(state.Nice), pktName, + ) + } + state.Ctx.LogD("sp-done", lesp, logMsg) + pth := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TTx), - Base32Codec.EncodeToString(done.Hash[:]), - )) - sdsp["xx"] = string(TTx) - if err == nil { - state.Ctx.LogI("sp-done", sdsp, "") + pktName, + ) + if err = os.Remove(pth); err == nil { + state.Ctx.LogI("sp-done", lesp, func(les LEs) string { + return fmt.Sprintf("Packet %s is sent", pktName) + }) + if state.Ctx.HdrUsage { + os.Remove(pth + HdrSuffix) + } } else { - state.Ctx.LogE("sp-done", sdsp, err, "") + state.Ctx.LogE("sp-done", lesp, err, logMsg) } + case SPTypeFreq: - sdsp := SdsAdd(sds, SDS{"type": "freq"}) - state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet") + lesp := append(les, LE{"Type", "freq"}) + state.Ctx.LogD("sp-process-freq", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling FREQ", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) var freq SPFreq if _, err = xdr.Unmarshal(r, &freq); err != nil { - state.Ctx.LogE("sp-process", sdsp, err, "") + state.Ctx.LogE("sp-process-freq", lesp, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling FREQ", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) return nil, err } - sdsp["pkt"] = Base32Codec.EncodeToString(freq.Hash[:]) - sdsp["offset"] = freq.Offset - state.Ctx.LogD("sp-process", sdsp, "queueing") + pktName := Base32Codec.EncodeToString(freq.Hash[:]) + lesp = append(lesp, LE{"Pkt", pktName}, LE{"Offset", freq.Offset}) + state.Ctx.LogD("sp-process-freq-queueing", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): FREQ %s: queuing", + state.Node.Name, NicenessFmt(state.Nice), pktName, + ) + }) nice, exists := state.infosOurSeen[*freq.Hash] if exists { if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] { @@ -1079,21 +1613,38 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice} state.Unlock() } else { - state.Ctx.LogD("sp-process", sdsp, "skipping") + state.Ctx.LogD("sp-process-freq-skip", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): FREQ %s: skipping", + state.Node.Name, NicenessFmt(state.Nice), pktName, + ) + }) } } else { - state.Ctx.LogD("sp-process", sdsp, "unknown") + state.Ctx.LogD("sp-process-freq-unknown", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): FREQ %s: unknown", + state.Node.Name, NicenessFmt(state.Nice), pktName, + ) + }) } + default: state.Ctx.LogE( - "sp-process", - SdsAdd(sds, SDS{"type": head.Type}), + "sp-process-type-unknown", + append(les, LE{"Type", head.Type}), errors.New("unknown type"), - "", + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): %d", + state.Node.Name, NicenessFmt(state.Nice), head.Type, + ) + }, ) return nil, BadPktType } } + if infosGot { var pkts int var size uint64 @@ -1103,12 +1654,17 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { size += info.Size } state.RUnlock() - state.Ctx.LogI("sp-infos", SDS{ - "xx": string(TRx), - "node": state.Node.Id, - "pkts": pkts, - "size": int64(size), - }, "") + state.Ctx.LogI("sp-infos-rx", LEs{ + {"XX", string(TRx)}, + {"Node", state.Node.Id}, + {"Pkts", pkts}, + {"Size", int64(size)}, + }, func(les LEs) string { + return fmt.Sprintf( + "%s has got for us: %d packets, %s", + state.Node.Name, pkts, humanize.IBytes(size), + ) + }) } return payloadsSplit(replies), nil }