X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Fsp.go;h=6d31375e137ff3a044d0bb5c1ad8ea4c481b4dd0;hb=466a7b8cab7847bcee35aff5c35ed79361cde427;hp=e1207693cd9c985d13c9d67e9d250e15a5691304;hpb=785dbe18183ba25a478a50c1d96fe263b5ab43f2;p=nncp.git diff --git a/src/sp.go b/src/sp.go index e120769..6d31375 100644 --- a/src/sp.go +++ b/src/sp.go @@ -21,6 +21,7 @@ import ( "bytes" "crypto/subtle" "errors" + "fmt" "hash" "io" "os" @@ -30,6 +31,7 @@ import ( "time" xdr "github.com/davecgh/go-xdr/xdr2" + "github.com/dustin/go-humanize" "github.com/flynn/noise" "golang.org/x/crypto/blake2b" ) @@ -40,6 +42,11 @@ const ( SPHeadOverhead = 4 ) +type SPCheckerQueues struct { + appeared chan *[32]byte + checked chan *[32]byte +} + var ( MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1} @@ -57,6 +64,8 @@ var ( DefaultDeadline = 10 * time.Second PingTimeout = time.Minute + + spCheckers = make(map[NodeId]*SPCheckerQueues) ) type FdAndFullSize struct { @@ -224,8 +233,9 @@ type SPState struct { onlyPkts map[[32]byte]bool writeSPBuf bytes.Buffer fds map[string]FdAndFullSize + fdsLock sync.RWMutex fileHashers map[string]*HasherAndOffset - checkerJobs chan *[32]byte + checkerQueues SPCheckerQueues sync.RWMutex } @@ -252,9 +262,6 @@ func (state *SPState) SetDead() { s.fd.Close() } }() - if !state.NoCK { - close(state.checkerJobs) - } } func (state *SPState) NotAlive() bool { @@ -271,28 +278,35 @@ func (state *SPState) dirUnlock() { state.Ctx.UnlockDir(state.txLock) } -func (state *SPState) SPChecker() { - for hshValue := range state.checkerJobs { +func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) { + for hshValue := range appeared { + pktName := Base32Codec.EncodeToString(hshValue[:]) les := LEs{ {"XX", string(TRx)}, - {"Node", state.Node.Id}, - {"Pkt", Base32Codec.EncodeToString(hshValue[:])}, + {"Node", nodeId}, + {"Pkt", pktName}, } - state.Ctx.LogD("sp-file", les, "checking") - size, err := state.Ctx.CheckNoCK(state.Node.Id, hshValue) + ctx.LogD("sp-checker", les, func(les LEs) string { + return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(nodeId), pktName) + }) + size, err := ctx.CheckNoCK(nodeId, hshValue) les = append(les, LE{"Size", size}) if err != nil { - state.Ctx.LogE("sp-file", les, err, "") + ctx.LogE("sp-checker", les, err, func(les LEs) string { + return fmt.Sprintf( + "Checksumming %s/rx/%s (%s)", ctx.NodeName(nodeId), pktName, + humanize.IBytes(uint64(size)), + ) + }) continue } - state.Ctx.LogI("sp-done", les, "") - state.wg.Add(1) - go func(hsh *[32]byte) { - if !state.NotAlive() { - state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh}) - } - state.wg.Done() - }(hshValue) + ctx.LogI("sp-checker-done", les, func(les LEs) string { + return fmt.Sprintf( + "Packet %s is retreived (%s)", + pktName, humanize.IBytes(uint64(size)), + ) + }) + go func(hsh *[32]byte) { checked <- hsh }(hshValue) } } @@ -355,19 +369,34 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [ var payloads [][]byte for _, info := range infos { payloads = append(payloads, MarshalSP(SPTypeInfo, info)) + pktName := Base32Codec.EncodeToString(info.Hash[:]) ctx.LogD("sp-info-our", LEs{ {"Node", nodeId}, - {"Name", Base32Codec.EncodeToString(info.Hash[:])}, + {"Name", pktName}, {"Size", info.Size}, - }, "") + }, func(les LEs) string { + return fmt.Sprintf( + "Our info: %s/tx/%s (%s)", + ctx.NodeName(nodeId), + pktName, + humanize.IBytes(info.Size), + ) + }) } if totalSize > 0 { - ctx.LogI("sp-infos", LEs{ + ctx.LogI("sp-infos-tx", LEs{ {"XX", string(TTx)}, {"Node", nodeId}, {"Pkts", len(payloads)}, {"Size", totalSize}, - }, "") + }, func(les LEs) string { + return fmt.Sprintf( + "We have got for %s: %d packets, %s", + ctx.NodeName(nodeId), + len(payloads), + humanize.IBytes(uint64(totalSize)), + ) + }) } return payloadsSplit(payloads) } @@ -437,30 +466,72 @@ func (state *SPState) StartI(conn ConnDeadlined) error { return err } les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}} - state.Ctx.LogD("sp-start", les, "sending first message") + state.Ctx.LogD("sp-startI", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): sending first message", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if err = state.WriteSP(conn, buf, false); err != nil { - state.Ctx.LogE("sp-start", les, err, "") + state.Ctx.LogE("sp-startI", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): writing", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) state.dirUnlock() return err } - state.Ctx.LogD("sp-start", les, "waiting for first message") + state.Ctx.LogD("sp-startI-wait", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): waiting for first message", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if buf, err = state.ReadSP(conn); err != nil { - state.Ctx.LogE("sp-start", les, err, "") + state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): reading", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) state.dirUnlock() return err } payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf) if err != nil { - state.Ctx.LogE("sp-start", les, err, "") + state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): reading Noise message", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) state.dirUnlock() return err } - state.Ctx.LogD("sp-start", les, "starting workers") + state.Ctx.LogD("sp-startI-workers", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): starting workers", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) err = state.StartWorkers(conn, infosPayloads, payload) if err != nil { - state.Ctx.LogE("sp-start", les, err, "") + state.Ctx.LogE("sp-startI-workers", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): starting workers", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) state.dirUnlock() } return err @@ -492,14 +563,21 @@ func (state *SPState) StartR(conn ConnDeadlined) error { var buf []byte var payload []byte - state.Ctx.LogD("sp-start", LEs{{"Nice", int(state.Nice)}}, "waiting for first message") + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP nice %s: waiting for first message", + NicenessFmt(state.Nice), + ) + } + les := LEs{{"Nice", int(state.Nice)}} + state.Ctx.LogD("sp-startR", les, logMsg) conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if buf, err = state.ReadSP(conn); err != nil { - state.Ctx.LogE("sp-start", LEs{}, err, "") + state.Ctx.LogE("sp-startR-read", les, err, logMsg) return err } if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil { - state.Ctx.LogE("sp-start", LEs{}, err, "") + state.Ctx.LogE("sp-startR-read", les, err, logMsg) return err } @@ -512,15 +590,16 @@ func (state *SPState) StartR(conn ConnDeadlined) error { } if node == nil { peerId := Base32Codec.EncodeToString(state.hs.PeerStatic()) - state.Ctx.LogE("sp-start", LEs{{"Peer", peerId}}, errors.New("unknown peer"), "") - return errors.New("Unknown peer: " + peerId) + err = errors.New("unknown peer: " + peerId) + state.Ctx.LogE("sp-startR-unknown", append(les, LE{"Peer", peerId}), err, logMsg) + return err } state.Node = node state.rxRate = node.RxRate state.txRate = node.TxRate state.onlineDeadline = node.OnlineDeadline state.maxOnlineTime = node.MaxOnlineTime - les := LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}} + les = LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}} if err = state.Ctx.ensureRxDir(node.Id); err != nil { return err @@ -555,7 +634,12 @@ func (state *SPState) StartR(conn ConnDeadlined) error { firstPayload = append(firstPayload, SPHaltMarshalized...) } - state.Ctx.LogD("sp-start", les, "sending first message") + state.Ctx.LogD("sp-startR-write", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): sending first message", + node.Name, NicenessFmt(state.Nice), + ) + }) buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload) if err != nil { state.dirUnlock() @@ -563,11 +647,21 @@ func (state *SPState) StartR(conn ConnDeadlined) error { } conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if err = state.WriteSP(conn, buf, false); err != nil { - state.Ctx.LogE("sp-start", les, err, "") + state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): writing", + node.Name, NicenessFmt(state.Nice), + ) + }) state.dirUnlock() return err } - state.Ctx.LogD("sp-start", les, "starting workers") + state.Ctx.LogD("sp-startR-workers", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): starting workers", + node.Name, NicenessFmt(state.Nice), + ) + }) err = state.StartWorkers(conn, infosPayloads, payload) if err != nil { state.dirUnlock() @@ -576,25 +670,12 @@ func (state *SPState) StartR(conn ConnDeadlined) error { } func (state *SPState) closeFd(pth string) { - s, exists := state.fds[pth] - delete(state.fds, pth) - if exists { + state.fdsLock.Lock() + if s, exists := state.fds[pth]; exists { + delete(state.fds, pth) s.fd.Close() } -} - -func (state *SPState) FillExistingNoCK() { - checkerJobs := make([]*[32]byte, 0) - for job := range state.Ctx.JobsNoCK(state.Node.Id) { - if job.PktEnc.Nice > state.Nice { - continue - } - checkerJobs = append(checkerJobs, job.HshValue) - } - for _, job := range checkerJobs { - state.checkerJobs <- job - } - state.wg.Done() + state.fdsLock.Unlock() } func (state *SPState) StartWorkers( @@ -612,10 +693,35 @@ func (state *SPState) StartWorkers( // Checker if !state.NoCK { - state.checkerJobs = make(chan *[32]byte) - go state.SPChecker() + queues := spCheckers[*state.Node.Id] + if queues == nil { + queues = &SPCheckerQueues{ + appeared: make(chan *[32]byte), + checked: make(chan *[32]byte), + } + spCheckers[*state.Node.Id] = queues + go SPChecker(state.Ctx, state.Node.Id, queues.appeared, queues.checked) + } + state.checkerQueues = *queues + go func() { + for job := range state.Ctx.JobsNoCK(state.Node.Id) { + if job.PktEnc.Nice <= state.Nice { + state.checkerQueues.appeared <- job.HshValue + } + } + }() state.wg.Add(1) - go state.FillExistingNoCK() + go func() { + defer state.wg.Done() + for { + select { + case <-state.isDead: + return + case hsh := <-state.checkerQueues.checked: + state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh}) + } + } + }() } // Remaining handshake payload sending @@ -624,9 +730,15 @@ func (state *SPState) StartWorkers( go func() { for _, payload := range infosPayloads[1:] { state.Ctx.LogD( - "sp-work", - append(les, LE{"Size", len(payload)}), - "queuing remaining payload", + "sp-queue-remaining", + append(les, LE{"Size", int64(len(payload))}), + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): queuing remaining payload (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + }, ) state.payloads <- payload } @@ -635,23 +747,32 @@ func (state *SPState) StartWorkers( } // Processing of first payload and queueing its responses - state.Ctx.LogD( - "sp-work", - append(les, LE{"Size", len(payload)}), - "processing first payload", - ) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): processing first payload (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + } + state.Ctx.LogD("sp-process", append(les, LE{"Size", int64(len(payload))}), logMsg) replies, err := state.ProcessSP(payload) if err != nil { - state.Ctx.LogE("sp-work", les, err, "") + state.Ctx.LogE("sp-process", les, err, logMsg) return err } state.wg.Add(1) go func() { for _, reply := range replies { state.Ctx.LogD( - "sp-work", - append(les, LE{"Size", len(reply)}), - "queuing reply", + "sp-queue-reply", + append(les, LE{"Size", int64(len(reply))}), + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): queuing reply (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + }, ) state.payloads <- reply } @@ -708,9 +829,15 @@ func (state *SPState) StartWorkers( &state.infosOurSeen, ) { state.Ctx.LogD( - "sp-work", - append(les, LE{"Size", len(payload)}), - "queuing new info", + "sp-queue-info", + append(les, LE{"Size", int64(len(payload))}), + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): queuing new info (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + }, ) state.payloads <- payload } @@ -733,14 +860,25 @@ func (state *SPState) StartWorkers( var ping bool select { case <-state.pings: - state.Ctx.LogD("sp-xmit", les, "got ping") + state.Ctx.LogD("sp-got-ping", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): got ping", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) payload = SPPingMarshalized ping = true case payload = <-state.payloads: state.Ctx.LogD( - "sp-xmit", - append(les, LE{"Size", len(payload)}), - "got payload", + "sp-got-payload", + append(les, LE{"Size", int64(len(payload))}), + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): got payload (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + }, ) default: state.RLock() @@ -754,50 +892,87 @@ func (state *SPState) StartWorkers( if state.txRate > 0 { time.Sleep(time.Second / time.Duration(state.txRate)) } - lesp := append(les, LEs{ - {"XX", string(TTx)}, - {"Pkt", Base32Codec.EncodeToString(freq.Hash[:])}, - {"Size", int64(freq.Offset)}, - }...) - state.Ctx.LogD("sp-file", lesp, "queueing") + pktName := Base32Codec.EncodeToString(freq.Hash[:]) + lesp := append( + les, + LE{"XX", string(TTx)}, + LE{"Pkt", pktName}, + LE{"Size", int64(freq.Offset)}, + ) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): tx/%s (%s)", + state.Node.Name, NicenessFmt(state.Nice), + pktName, + humanize.IBytes(freq.Offset), + ) + } + state.Ctx.LogD("sp-queue", lesp, func(les LEs) string { + return logMsg(les) + ": queueing" + }) pth := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TTx), Base32Codec.EncodeToString(freq.Hash[:]), ) + state.fdsLock.RLock() fdAndFullSize, exists := state.fds[pth] + state.fdsLock.RUnlock() if !exists { fd, err := os.Open(pth) if err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") + state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string { + return logMsg(les) + ": opening" + }) return } fi, err := fd.Stat() if err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") + state.Ctx.LogE("sp-queue-stat", lesp, err, func(les LEs) string { + return logMsg(les) + ": stating" + }) return } fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()} + state.fdsLock.Lock() state.fds[pth] = fdAndFullSize + state.fdsLock.Unlock() } fd := fdAndFullSize.fd fullSize := fdAndFullSize.fullSize var buf []byte if freq.Offset < uint64(fullSize) { - state.Ctx.LogD("sp-file", lesp, "seeking") + state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string { + return logMsg(les) + ": seeking" + }) if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") + state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string { + return logMsg(les) + ": seeking" + }) return } buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead) n, err := fd.Read(buf) if err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") + state.Ctx.LogE("sp-file-read", lesp, err, func(les LEs) string { + return logMsg(les) + ": reading" + }) return } buf = buf[:n] - state.Ctx.LogD("sp-file", append(lesp, LE{"Size", n}), "read") + lesp = append( + les, + LE{"XX", string(TTx)}, + LE{"Pkt", pktName}, + LE{"Size", int64(n)}, + ) + state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string { + return fmt.Sprintf( + "%s: read %s", + logMsg(les), humanize.IBytes(uint64(n)), + ) + }) } state.closeFd(pth) payload = MarshalSP(SPTypeFile, SPFile{ @@ -806,15 +981,22 @@ func (state *SPState) StartWorkers( Payload: buf, }) ourSize := freq.Offset + uint64(len(buf)) - lesp = append(lesp, LE{"Size", int64(ourSize)}) - lesp = append(lesp, LE{"FullSize", fullSize}) + lesp = append( + les, + LE{"XX", string(TTx)}, + LE{"Pkt", pktName}, + LE{"Size", int64(ourSize)}, + LE{"FullSize", fullSize}, + ) if state.Ctx.ShowPrgrs { Progress("Tx", lesp) } state.Lock() if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash { if ourSize == uint64(fullSize) { - state.Ctx.LogD("sp-file", lesp, "finished") + state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string { + return logMsg(les) + ": finished" + }) if len(state.queueTheir) > 1 { state.queueTheir = state.queueTheir[1:] } else { @@ -824,14 +1006,23 @@ func (state *SPState) StartWorkers( state.queueTheir[0].freq.Offset += uint64(len(buf)) } } else { - state.Ctx.LogD("sp-file", lesp, "queue disappeared") + state.Ctx.LogD("sp-file-disappeared", lesp, func(les LEs) string { + return logMsg(les) + ": queue disappeared" + }) } state.Unlock() } - state.Ctx.LogD("sp-xmit", append(les, LE{"Size", len(payload)}), "sending") + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): sending %s", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + } + state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg) conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil { - state.Ctx.LogE("sp-xmit", les, err, "") + state.Ctx.LogE("sp-sending", les, err, logMsg) return } } @@ -844,7 +1035,13 @@ func (state *SPState) StartWorkers( if state.NotAlive() { break } - state.Ctx.LogD("sp-recv", les, "waiting for payload") + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): waiting for payload", + state.Node.Name, NicenessFmt(state.Nice), + ) + } + state.Ctx.LogD("sp-recv-wait", les, logMsg) conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 payload, err := state.ReadSP(conn) if err != nil { @@ -858,36 +1055,55 @@ func (state *SPState) StartWorkers( if unmarshalErr.ErrorCode == xdr.ErrIO { break } - state.Ctx.LogE("sp-recv", les, err, "") + state.Ctx.LogE("sp-recv-wait", les, err, logMsg) break } + logMsg = func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): payload (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + } state.Ctx.LogD( - "sp-recv", - append(les, LE{"Size", len(payload)}), - "got payload", + "sp-recv-got", + append(les, LE{"Size", int64(len(payload))}), + func(les LEs) string { return logMsg(les) + ": got" }, ) payload, err = state.csTheir.Decrypt(nil, nil, payload) if err != nil { - state.Ctx.LogE("sp-recv", les, err, "") + state.Ctx.LogE("sp-recv-got", les, err, func(les LEs) string { + return logMsg(les) + ": got" + }) break } state.Ctx.LogD( - "sp-recv", - append(les, LE{"Size", len(payload)}), - "processing", + "sp-recv-process", + append(les, LE{"Size", int64(len(payload))}), + func(les LEs) string { + return logMsg(les) + ": processing" + }, ) replies, err := state.ProcessSP(payload) if err != nil { - state.Ctx.LogE("sp-recv", les, err, "") + state.Ctx.LogE("sp-recv-process", les, err, func(les LEs) string { + return logMsg(les) + ": processing" + }) break } state.wg.Add(1) go func() { for _, reply := range replies { state.Ctx.LogD( - "sp-recv", - append(les, LE{"Size", len(reply)}), - "queuing reply", + "sp-recv-reply", + append(les[:len(les)-1], LE{"Size", int64(len(reply))}), + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): queuing reply (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(reply))), + ) + }, ) state.payloads <- reply } @@ -931,10 +1147,20 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { var replies [][]byte var infosGot bool for r.Len() > 0 { - state.Ctx.LogD("sp-process", les, "unmarshaling header") + state.Ctx.LogD("sp-process-unmarshal", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling header", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) var head SPHead if _, err = xdr.Unmarshal(r, &head); err != nil { - state.Ctx.LogE("sp-process", les, err, "") + state.Ctx.LogE("sp-process-unmarshal", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling header", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) return nil, err } if head.Type != SPTypePing { @@ -942,62 +1168,126 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } switch head.Type { case SPTypeHalt: - state.Ctx.LogD("sp-process", append(les, LE{"Type", "halt"}), "") + state.Ctx.LogD( + "sp-process-halt", + append(les, LE{"Type", "halt"}), func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): got HALT", + state.Node.Name, NicenessFmt(state.Nice), + ) + }, + ) state.Lock() state.queueTheir = nil state.Unlock() case SPTypePing: - state.Ctx.LogD("sp-process", append(les, LE{"Type", "ping"}), "") + state.Ctx.LogD( + "sp-process-ping", + append(les, LE{"Type", "ping"}), + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): got PING", + state.Node.Name, NicenessFmt(state.Nice), + ) + }, + ) case SPTypeInfo: infosGot = true lesp := append(les, LE{"Type", "info"}) - state.Ctx.LogD("sp-process", lesp, "unmarshaling packet") + state.Ctx.LogD( + "sp-process-info-unmarshal", lesp, + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling INFO", + state.Node.Name, NicenessFmt(state.Nice), + ) + }, + ) var info SPInfo if _, err = xdr.Unmarshal(r, &info); err != nil { - state.Ctx.LogE("sp-process", lesp, err, "") + state.Ctx.LogE( + "sp-process-info-unmarshal", lesp, err, + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling INFO", + state.Node.Name, NicenessFmt(state.Nice), + ) + }, + ) return nil, err } - lesp = append(lesp, LEs{ - {"Pkt", Base32Codec.EncodeToString(info.Hash[:])}, - {"Size", int64(info.Size)}, - {"Nice", int(info.Nice)}, - }...) + pktName := Base32Codec.EncodeToString(info.Hash[:]) + lesp = append( + lesp, + LE{"Pkt", pktName}, + LE{"Size", int64(info.Size)}, + LE{"PktNice", int(info.Nice)}, + ) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): INFO %s (%s) nice %s", + state.Node.Name, NicenessFmt(state.Nice), + pktName, + humanize.IBytes(info.Size), + NicenessFmt(info.Nice), + ) + } if !state.listOnly && info.Nice > state.Nice { - state.Ctx.LogD("sp-process", lesp, "too nice") + state.Ctx.LogD("sp-process-info-too-nice", lesp, func(les LEs) string { + return logMsg(les) + ": too nice" + }) continue } - state.Ctx.LogD("sp-process", lesp, "received") + state.Ctx.LogD("sp-process-info-got", lesp, func(les LEs) string { + return logMsg(les) + ": received" + }) if !state.listOnly && state.xxOnly == TTx { continue } state.Lock() state.infosTheir[*info.Hash] = &info state.Unlock() - state.Ctx.LogD("sp-process", lesp, "stating part") + state.Ctx.LogD("sp-process-info-stat", lesp, func(les LEs) string { + return logMsg(les) + ": stating part" + }) pktPath := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TRx), Base32Codec.EncodeToString(info.Hash[:]), ) + logMsg = func(les LEs) string { + return fmt.Sprintf( + "Packet %s (%s) (nice %s)", + pktName, + humanize.IBytes(info.Size), + NicenessFmt(info.Nice), + ) + } if _, err = os.Stat(pktPath); err == nil { - state.Ctx.LogI("sp-info", lesp, "already done") + state.Ctx.LogI("sp-info-done", lesp, func(les LEs) string { + return logMsg(les) + ": already done" + }) if !state.listOnly { replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash})) } continue } if _, err = os.Stat(pktPath + SeenSuffix); err == nil { - state.Ctx.LogI("sp-info", lesp, "already seen") + state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string { + return logMsg(les) + ": already seen" + }) if !state.listOnly { replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash})) } continue } if _, err = os.Stat(pktPath + NoCKSuffix); err == nil { - state.Ctx.LogI("sp-info", lesp, "still non checksummed") + state.Ctx.LogI("sp-info-nock", lesp, func(les LEs) string { + return logMsg(les) + ": still not checksummed" + }) continue } fi, err := os.Stat(pktPath + PartSuffix) @@ -1006,10 +1296,20 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { offset = fi.Size() } if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) { - state.Ctx.LogI("sp-info", lesp, "not enough space") + state.Ctx.LogI("sp-info-no-space", lesp, func(les LEs) string { + return logMsg(les) + ": not enough space" + }) continue } - state.Ctx.LogI("sp-info", append(lesp, LE{"Offset", offset}), "") + state.Ctx.LogI( + "sp-info", + append(lesp, LE{"Offset", offset}), + func(les LEs) string { + return fmt.Sprintf( + "%s: %d%%", logMsg(les), 100*uint64(offset)/info.Size, + ) + }, + ) if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) { replies = append(replies, MarshalSP( SPTypeFreq, @@ -1019,26 +1319,48 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { case SPTypeFile: lesp := append(les, LE{"Type", "file"}) - state.Ctx.LogD("sp-process", lesp, "unmarshaling packet") + state.Ctx.LogD("sp-process-file", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling FILE", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) var file SPFile if _, err = xdr.Unmarshal(r, &file); err != nil { - state.Ctx.LogE("sp-process", lesp, err, "") + state.Ctx.LogE("sp-process-file", lesp, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling FILE", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) return nil, err } - lesp = append(lesp, LEs{ - {"XX", string(TRx)}, - {"Pkt", Base32Codec.EncodeToString(file.Hash[:])}, - {"Size", len(file.Payload)}, - }...) + pktName := Base32Codec.EncodeToString(file.Hash[:]) + lesp = append( + lesp, + LE{"XX", string(TRx)}, + LE{"Pkt", pktName}, + LE{"Size", int64(len(file.Payload))}, + ) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "Got packet %s (%s)", + pktName, humanize.IBytes(uint64(len(file.Payload))), + ) + } dirToSync := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TRx), ) - filePath := filepath.Join(dirToSync, Base32Codec.EncodeToString(file.Hash[:])) + filePath := filepath.Join(dirToSync, pktName) filePathPart := filePath + PartSuffix - state.Ctx.LogD("sp-file", lesp, "opening part") + state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string { + return logMsg(les) + ": opening part" + }) + state.fdsLock.RLock() fdAndFullSize, exists := state.fds[filePathPart] + state.fdsLock.RUnlock() var fd *os.File if exists { fd = fdAndFullSize.fd @@ -1049,10 +1371,14 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { os.FileMode(0666), ) if err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") + state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string { + return logMsg(les) + ": opening part" + }) return nil, err } + state.fdsLock.Lock() state.fds[filePathPart] = FdAndFullSize{fd: fd} + state.fdsLock.Unlock() if file.Offset == 0 { h, err := blake2b.New256(nil) if err != nil { @@ -1061,15 +1387,26 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.fileHashers[filePath] = &HasherAndOffset{h: h} } } - state.Ctx.LogD("sp-file", append(lesp, LE{"Offset", file.Offset}), "seeking") + state.Ctx.LogD( + "sp-file-seek", + append(lesp, LE{"Offset", file.Offset}), + func(les LEs) string { + return fmt.Sprintf("%s: seeking %d", logMsg(les), file.Offset) + }) if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") + state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string { + return logMsg(les) + ": seeking" + }) state.closeFd(filePathPart) return nil, err } - state.Ctx.LogD("sp-file", lesp, "writing") + state.Ctx.LogD("sp-file-write", lesp, func(les LEs) string { + return logMsg(les) + ": writing" + }) if _, err = fd.Write(file.Payload); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") + state.Ctx.LogE("sp-file-write", lesp, err, func(les LEs) string { + return logMsg(les) + ": writing" + }) state.closeFd(filePathPart) return nil, err } @@ -1081,10 +1418,11 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } hasherAndOffset.offset += uint64(len(file.Payload)) } else { - state.Ctx.LogE( - "sp-file", lesp, - errors.New("offset differs"), - "deleting hasher", + state.Ctx.LogD( + "sp-file-offset-differs", lesp, + func(les LEs) string { + return logMsg(les) + ": offset differs, deleting hasher" + }, ) delete(state.fileHashers, filePath) hasherExists = false @@ -1106,26 +1444,46 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { if fullsize != ourSize { continue } + logMsg = func(les LEs) string { + return fmt.Sprintf( + "Got packet %s %d%% (%s / %s)", + pktName, 100*ourSize/fullsize, + humanize.IBytes(uint64(ourSize)), + humanize.IBytes(uint64(fullsize)), + ) + } err = fd.Sync() if err != nil { - state.Ctx.LogE("sp-file", lesp, err, "sync") + state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string { + return logMsg(les) + ": syncing" + }) state.closeFd(filePathPart) continue } if hasherExists { if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 { - state.Ctx.LogE("sp-file", lesp, errors.New("checksum mismatch"), "") + state.Ctx.LogE( + "sp-file-bad-checksum", lesp, + errors.New("checksum mismatch"), + logMsg, + ) continue } if err = os.Rename(filePathPart, filePath); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "rename") + state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string { + return logMsg(les) + ": renaming" + }) continue } if err = DirSync(dirToSync); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "sync") + state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string { + return logMsg(les) + ": dirsyncing" + }) continue } - state.Ctx.LogI("sp-file", lesp, "done") + state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string { + return logMsg(les) + ": done" + }) state.wg.Add(1) go func() { state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash}) @@ -1139,14 +1497,18 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { continue } if _, err = fd.Seek(0, io.SeekStart); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "seek") + state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string { + return logMsg(les) + ": seeking" + }) state.closeFd(filePathPart) continue } _, pktEncRaw, err := state.Ctx.HdrRead(fd) state.closeFd(filePathPart) if err != nil { - state.Ctx.LogE("sp-file", lesp, err, "HdrRead") + state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string { + return logMsg(les) + ": HdrReading" + }) continue } state.Ctx.HdrWrite(pktEncRaw, filePath) @@ -1154,59 +1516,97 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } state.closeFd(filePathPart) if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "rename") + state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string { + return logMsg(les) + ": renaming" + }) continue } if err = DirSync(dirToSync); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "sync") + state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string { + return logMsg(les) + ": dirsyncing" + }) continue } - state.Ctx.LogI("sp-file", lesp, "downloaded") + state.Ctx.LogI("sp-file-downloaded", lesp, func(les LEs) string { + return logMsg(les) + ": downloaded" + }) state.Lock() delete(state.infosTheir, *file.Hash) state.Unlock() if !state.NoCK { - state.checkerJobs <- file.Hash + state.checkerQueues.appeared <- file.Hash } case SPTypeDone: lesp := append(les, LE{"Type", "done"}) - state.Ctx.LogD("sp-process", lesp, "unmarshaling packet") + state.Ctx.LogD("sp-process-done-unmarshal", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling DONE", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) var done SPDone if _, err = xdr.Unmarshal(r, &done); err != nil { - state.Ctx.LogE("sp-process", lesp, err, "") + state.Ctx.LogE("sp-process-done-unmarshal", lesp, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling DONE", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) return nil, err } - lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(done.Hash[:])}) - state.Ctx.LogD("sp-done", lesp, "removing") + pktName := Base32Codec.EncodeToString(done.Hash[:]) + lesp = append(lesp, LE{"Pkt", pktName}, LE{"XX", string(TTx)}) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): DONE: removing %s", + state.Node.Name, NicenessFmt(state.Nice), pktName, + ) + } + state.Ctx.LogD("sp-done", lesp, logMsg) pth := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TTx), - Base32Codec.EncodeToString(done.Hash[:]), + pktName, ) - err := os.Remove(pth) - lesp = append(lesp, LE{"XX", string(TTx)}) - if err == nil { - state.Ctx.LogI("sp-done", lesp, "") + if err = os.Remove(pth); err == nil { + state.Ctx.LogI("sp-done", lesp, func(les LEs) string { + return fmt.Sprintf("Packet %s is sent", pktName) + }) if state.Ctx.HdrUsage { os.Remove(pth + HdrSuffix) } } else { - state.Ctx.LogE("sp-done", lesp, err, "") + state.Ctx.LogE("sp-done", lesp, err, logMsg) } case SPTypeFreq: lesp := append(les, LE{"Type", "freq"}) - state.Ctx.LogD("sp-process", lesp, "unmarshaling packet") + state.Ctx.LogD("sp-process-freq", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling FREQ", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) var freq SPFreq if _, err = xdr.Unmarshal(r, &freq); err != nil { - state.Ctx.LogE("sp-process", lesp, err, "") + state.Ctx.LogE("sp-process-freq", lesp, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling FREQ", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) return nil, err } - lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(freq.Hash[:])}) - lesp = append(lesp, LE{"Offset", freq.Offset}) - state.Ctx.LogD("sp-process", lesp, "queueing") + pktName := Base32Codec.EncodeToString(freq.Hash[:]) + lesp = append(lesp, LE{"Pkt", pktName}, LE{"Offset", freq.Offset}) + state.Ctx.LogD("sp-process-freq-queueing", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): FREQ %s: queuing", + state.Node.Name, NicenessFmt(state.Nice), pktName, + ) + }) nice, exists := state.infosOurSeen[*freq.Hash] if exists { if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] { @@ -1223,22 +1623,38 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice} state.Unlock() } else { - state.Ctx.LogD("sp-process", lesp, "skipping") + state.Ctx.LogD("sp-process-freq-skip", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): FREQ %s: skipping", + state.Node.Name, NicenessFmt(state.Nice), pktName, + ) + }) } } else { - state.Ctx.LogD("sp-process", lesp, "unknown") + state.Ctx.LogD("sp-process-freq-unknown", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): FREQ %s: unknown", + state.Node.Name, NicenessFmt(state.Nice), pktName, + ) + }) } default: state.Ctx.LogE( - "sp-process", + "sp-process-type-unknown", append(les, LE{"Type", head.Type}), errors.New("unknown type"), - "", + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): %d", + state.Node.Name, NicenessFmt(state.Nice), head.Type, + ) + }, ) return nil, BadPktType } } + if infosGot { var pkts int var size uint64 @@ -1248,12 +1664,17 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { size += info.Size } state.RUnlock() - state.Ctx.LogI("sp-infos", LEs{ + state.Ctx.LogI("sp-infos-rx", LEs{ {"XX", string(TRx)}, {"Node", state.Node.Id}, {"Pkts", pkts}, {"Size", int64(size)}, - }, "") + }, func(les LEs) string { + return fmt.Sprintf( + "%s has got for us: %d packets, %s", + state.Node.Name, pkts, humanize.IBytes(size), + ) + }) } return payloadsSplit(replies), nil }