X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Fsp.go;h=4db358742a17b1f584aa102ac853a1c6386c6d13;hb=0367cce2741e1ce6a89a49fd5c4e9df6005c9744;hp=6dd724a59807b744b404763d03f8991f4cdc556f;hpb=2f826e9b891bcf36cdb77fb5fec27a13f7911261;p=nncp.git diff --git a/src/sp.go b/src/sp.go index 6dd724a..4db3587 100644 --- a/src/sp.go +++ b/src/sp.go @@ -1,6 +1,6 @@ /* NNCP -- Node to Node copy, utilities for store-and-forward data exchange -Copyright (C) 2016-2019 Sergey Matveev +Copyright (C) 2016-2022 Sergey Matveev This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -21,33 +21,44 @@ import ( "bytes" "crypto/subtle" "errors" + "fmt" "io" - "net" + "log" "os" "path/filepath" "sort" - "strconv" "sync" "time" - "github.com/davecgh/go-xdr/xdr2" + xdr "github.com/davecgh/go-xdr/xdr2" + "github.com/dustin/go-humanize" "github.com/flynn/noise" ) const ( - MaxSPSize = 1<<16 - 256 - PartSuffix = ".part" - DefaultDeadline = 10 + MaxSPSize = 1<<16 - 256 + PartSuffix = ".part" + SPHeadOverhead = 4 ) -var ( - MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1} +type MTHAndOffset struct { + mth MTH + offset uint64 +} - SPHeadOverhead int +type SPCheckerTask struct { + nodeId *NodeId + hsh *[MTHSize]byte + mth MTH + done chan []byte +} + +var ( SPInfoOverhead int SPFreqOverhead int SPFileOverhead int SPHaltMarshalized []byte + SPPingMarshalized []byte NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite( noise.DH25519, @@ -55,9 +66,19 @@ var ( noise.HashBLAKE2b, ) - spWorkersGroup sync.WaitGroup + DefaultDeadline = 10 * time.Second + PingTimeout = time.Minute + + spCheckerTasks chan SPCheckerTask + SPCheckerWg sync.WaitGroup + spCheckerOnce sync.Once ) +type FdAndFullSize struct { + fd *os.File + fullSize int64 +} + type SPType uint8 const ( @@ -66,6 +87,7 @@ const ( SPTypeFile SPType = iota SPTypeDone SPType = iota SPTypeHalt SPType = iota + SPTypePing SPType = iota ) type SPHead struct { @@ -75,22 +97,22 @@ type SPHead struct { type SPInfo struct { Nice uint8 Size uint64 - Hash *[32]byte + Hash *[MTHSize]byte } type SPFreq struct { - Hash *[32]byte + Hash *[MTHSize]byte Offset uint64 } type SPFile struct { - Hash *[32]byte + Hash *[MTHSize]byte Offset uint64 Payload []byte } type SPDone struct { - Hash *[32]byte + Hash *[MTHSize]byte } type SPRaw struct { @@ -115,38 +137,46 @@ func init() { if _, err := xdr.Marshal(&buf, spHead); err != nil { panic(err) } + SPHaltMarshalized = make([]byte, SPHeadOverhead) copy(SPHaltMarshalized, buf.Bytes()) - SPHeadOverhead = buf.Len() buf.Reset() - spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)} + spHead = SPHead{Type: SPTypePing} + if _, err := xdr.Marshal(&buf, spHead); err != nil { + panic(err) + } + SPPingMarshalized = make([]byte, SPHeadOverhead) + copy(SPPingMarshalized, buf.Bytes()) + buf.Reset() + + spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([MTHSize]byte)} if _, err := xdr.Marshal(&buf, spInfo); err != nil { panic(err) } SPInfoOverhead = buf.Len() buf.Reset() - spFreq := SPFreq{Hash: new([32]byte), Offset: 123} + spFreq := SPFreq{Hash: new([MTHSize]byte), Offset: 123} if _, err := xdr.Marshal(&buf, spFreq); err != nil { panic(err) } SPFreqOverhead = buf.Len() buf.Reset() - spFile := SPFile{Hash: new([32]byte), Offset: 123} + spFile := SPFile{Hash: new([MTHSize]byte), Offset: 123} if _, err := xdr.Marshal(&buf, spFile); err != nil { panic(err) } SPFileOverhead = buf.Len() + spCheckerTasks = make(chan SPCheckerTask) } func MarshalSP(typ SPType, sp interface{}) []byte { var buf bytes.Buffer - var err error - if _, err = xdr.Marshal(&buf, SPHead{typ}); err != nil { + if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil { panic(err) } - if _, err = xdr.Marshal(&buf, sp); err != nil { + if _, err := xdr.Marshal(&buf, sp); err != nil { panic(err) } return buf.Bytes() @@ -172,21 +202,26 @@ type SPState struct { Ctx *Ctx Node *Node Nice uint8 - onlineDeadline uint - maxOnlineTime uint + NoCK bool + onlineDeadline time.Duration + maxOnlineTime time.Duration hs *noise.HandshakeState csOur *noise.CipherState csTheir *noise.CipherState payloads chan []byte - infosTheir map[[32]byte]*SPInfo - infosOurSeen map[[32]byte]uint8 + pings chan struct{} + infosTheir map[[MTHSize]byte]*SPInfo + infosOurSeen map[[MTHSize]byte]uint8 queueTheir []*FreqWithNice wg sync.WaitGroup RxBytes int64 RxLastSeen time.Time + RxLastNonPing time.Time TxBytes int64 TxLastSeen time.Time + TxLastNonPing time.Time started time.Time + mustFinishAt time.Time Duration time.Duration RxSpeed int64 TxSpeed int64 @@ -195,22 +230,44 @@ type SPState struct { xxOnly TRxTx rxRate int txRate int - isDead bool + isDead chan struct{} listOnly bool - onlyPkts map[[32]byte]bool + onlyPkts map[[MTHSize]byte]bool + writeSPBuf bytes.Buffer + fds map[string]FdAndFullSize + fdsLock sync.RWMutex + fileHashers map[string]*MTHAndOffset + progressBars map[string]struct{} sync.RWMutex } -func (state *SPState) NotAlive() bool { - if state.isDead { - return true +func (state *SPState) SetDead() { + state.Lock() + defer state.Unlock() + select { + case <-state.isDead: + // Already closed channel, dead + return + default: } - now := time.Now() - if state.maxOnlineTime > 0 && state.started.Add(time.Duration(state.maxOnlineTime)*time.Second).Before(now) { + close(state.isDead) + go func() { + for range state.payloads { + } + }() + go func() { + for range state.pings { + } + }() +} + +func (state *SPState) NotAlive() bool { + select { + case <-state.isDead: return true + default: } - return uint(now.Sub(state.RxLastSeen).Seconds()) >= state.onlineDeadline && - uint(now.Sub(state.TxLastSeen).Seconds()) >= state.onlineDeadline + return false } func (state *SPState) dirUnlock() { @@ -218,11 +275,21 @@ func (state *SPState) dirUnlock() { state.Ctx.UnlockDir(state.txLock) } -func (state *SPState) WriteSP(dst io.Writer, payload []byte) error { - n, err := xdr.Marshal(dst, SPRaw{Magic: MagicNNCPLv1, Payload: payload}) - if err == nil { +func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error { + state.writeSPBuf.Reset() + n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{ + Magic: MagicNNCPSv1.B, + Payload: payload, + }) + if err != nil { + return err + } + if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil { state.TxLastSeen = time.Now() state.TxBytes += int64(n) + if !ping { + state.TxLastNonPing = state.TxLastSeen + } } return err } @@ -231,21 +298,24 @@ func (state *SPState) ReadSP(src io.Reader) ([]byte, error) { var sp SPRaw n, err := xdr.UnmarshalLimited(src, &sp, 1<<17) if err != nil { + ue := err.(*xdr.UnmarshalError) + if ue.Err == io.EOF { + return nil, ue.Err + } return nil, err } state.RxLastSeen = time.Now() state.RxBytes += int64(n) - if sp.Magic != MagicNNCPLv1 { + if sp.Magic != MagicNNCPSv1.B { return nil, BadMagic } return sp.Payload, nil } -func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte { +func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[MTHSize]byte]uint8) [][]byte { var infos []*SPInfo var totalSize int64 for job := range ctx.Jobs(nodeId, TTx) { - job.Fd.Close() if job.PktEnc.Nice > nice { continue } @@ -264,19 +334,34 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [ var payloads [][]byte for _, info := range infos { payloads = append(payloads, MarshalSP(SPTypeInfo, info)) - ctx.LogD("sp-info-our", SDS{ - "node": nodeId, - "name": ToBase32(info.Hash[:]), - "size": strconv.FormatInt(int64(info.Size), 10), - }, "") + pktName := Base32Codec.EncodeToString(info.Hash[:]) + ctx.LogD("sp-info-our", LEs{ + {"Node", nodeId}, + {"Name", pktName}, + {"Size", info.Size}, + }, func(les LEs) string { + return fmt.Sprintf( + "Our info: %s/tx/%s (%s)", + ctx.NodeName(nodeId), + pktName, + humanize.IBytes(info.Size), + ) + }) } if totalSize > 0 { - ctx.LogI("sp-infos", SDS{ - "xx": string(TTx), - "node": nodeId, - "pkts": strconv.Itoa(len(payloads)), - "size": strconv.FormatInt(totalSize, 10), - }, "") + ctx.LogI("sp-infos-tx", LEs{ + {"XX", string(TTx)}, + {"Node", nodeId}, + {"Pkts", len(payloads)}, + {"Size", totalSize}, + }, func(les LEs) string { + return fmt.Sprintf( + "We have got for %s: %d packets, %s", + ctx.NodeName(nodeId), + len(payloads), + humanize.IBytes(uint64(totalSize)), + ) + }) } return payloadsSplit(payloads) } @@ -289,14 +374,14 @@ func (state *SPState) StartI(conn ConnDeadlined) error { } var rxLock *os.File if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) { - rxLock, err = state.Ctx.LockDir(nodeId, TRx) + rxLock, err = state.Ctx.LockDir(nodeId, string(TRx)) if err != nil { return err } } var txLock *os.File if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) { - txLock, err = state.Ctx.LockDir(nodeId, TTx) + txLock, err = state.Ctx.LockDir(nodeId, string(TTx)) if err != nil { return err } @@ -318,8 +403,10 @@ func (state *SPState) StartI(conn ConnDeadlined) error { } state.hs = hs state.payloads = make(chan []byte) - state.infosTheir = make(map[[32]byte]*SPInfo) - state.infosOurSeen = make(map[[32]byte]uint8) + state.pings = make(chan struct{}) + state.infosTheir = make(map[[MTHSize]byte]*SPInfo) + state.infosOurSeen = make(map[[MTHSize]byte]uint8) + state.progressBars = make(map[string]struct{}) state.started = started state.rxLock = rxLock state.txLock = txLock @@ -344,33 +431,74 @@ func (state *SPState) StartI(conn ConnDeadlined) error { state.dirUnlock() return err } - sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(state.Nice))} - state.Ctx.LogD("sp-start", sds, "sending first message") - conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second)) - if err = state.WriteSP(conn, buf); err != nil { - state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") + les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}} + state.Ctx.LogD("sp-startI", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): sending first message", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) + conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) + if err = state.WriteSP(conn, buf, false); err != nil { + state.Ctx.LogE("sp-startI", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): writing", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) state.dirUnlock() return err } - state.Ctx.LogD("sp-start", sds, "waiting for first message") - conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second)) + state.Ctx.LogD("sp-startI-wait", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): waiting for first message", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) + conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) if buf, err = state.ReadSP(conn); err != nil { - state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): reading", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) state.dirUnlock() return err } payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf) if err != nil { - state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): reading Noise message", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) state.dirUnlock() return err } - state.Ctx.LogD("sp-start", sds, "starting workers") + state.Ctx.LogD("sp-startI-workers", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): starting workers", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) err = state.StartWorkers(conn, infosPayloads, payload) if err != nil { - state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-startI-workers", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): starting workers", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) state.dirUnlock() - return err } return err } @@ -393,51 +521,62 @@ func (state *SPState) StartR(conn ConnDeadlined) error { xxOnly := TRxTx("") state.hs = hs state.payloads = make(chan []byte) - state.infosOurSeen = make(map[[32]byte]uint8) - state.infosTheir = make(map[[32]byte]*SPInfo) + state.pings = make(chan struct{}) + state.infosOurSeen = make(map[[MTHSize]byte]uint8) + state.infosTheir = make(map[[MTHSize]byte]*SPInfo) + state.progressBars = make(map[string]struct{}) state.started = started state.xxOnly = xxOnly + var buf []byte var payload []byte - state.Ctx.LogD( - "sp-start", - SDS{"nice": strconv.Itoa(int(state.Nice))}, - "waiting for first message", - ) - conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second)) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP nice %s: waiting for first message", + NicenessFmt(state.Nice), + ) + } + les := LEs{{"Nice", int(state.Nice)}} + state.Ctx.LogD("sp-startR", les, logMsg) + conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) if buf, err = state.ReadSP(conn); err != nil { - state.Ctx.LogE("sp-start", SDS{"err": err}, "") + state.Ctx.LogE("sp-startR-read", les, err, logMsg) return err } if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil { - state.Ctx.LogE("sp-start", SDS{"err": err}, "") + state.Ctx.LogE("sp-startR-read", les, err, logMsg) return err } var node *Node - for _, node = range state.Ctx.Neigh { - if subtle.ConstantTimeCompare(state.hs.PeerStatic(), node.NoisePub[:]) == 1 { + for _, n := range state.Ctx.Neigh { + if n.NoisePub == nil { + continue + } + if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 { + node = n break } } if node == nil { - peerId := ToBase32(state.hs.PeerStatic()) - state.Ctx.LogE("sp-start", SDS{"peer": peerId}, "unknown") - return errors.New("Unknown peer: " + peerId) + peerId := Base32Codec.EncodeToString(state.hs.PeerStatic()) + err = errors.New("unknown peer: " + peerId) + state.Ctx.LogE("sp-startR-unknown", append(les, LE{"Peer", peerId}), err, logMsg) + return err } state.Node = node state.rxRate = node.RxRate state.txRate = node.TxRate state.onlineDeadline = node.OnlineDeadline state.maxOnlineTime = node.MaxOnlineTime - sds := SDS{"node": node.Id, "nice": strconv.Itoa(int(state.Nice))} + les = LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}} - if state.Ctx.ensureRxDir(node.Id); err != nil { + if err = state.Ctx.ensureRxDir(node.Id); err != nil { return err } var rxLock *os.File if xxOnly == "" || xxOnly == TRx { - rxLock, err = state.Ctx.LockDir(node.Id, TRx) + rxLock, err = state.Ctx.LockDir(node.Id, string(TRx)) if err != nil { return err } @@ -445,7 +584,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error { state.rxLock = rxLock var txLock *os.File if xxOnly == "" || xxOnly == TTx { - txLock, err = state.Ctx.LockDir(node.Id, TTx) + txLock, err = state.Ctx.LockDir(node.Id, string(TTx)) if err != nil { return err } @@ -465,264 +604,501 @@ func (state *SPState) StartR(conn ConnDeadlined) error { firstPayload = append(firstPayload, SPHaltMarshalized...) } - state.Ctx.LogD("sp-start", sds, "sending first message") + state.Ctx.LogD("sp-startR-write", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): sending first message", + node.Name, NicenessFmt(state.Nice), + ) + }) buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload) if err != nil { state.dirUnlock() return err } - conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second)) - if err = state.WriteSP(conn, buf); err != nil { - state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") + conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) + if err = state.WriteSP(conn, buf, false); err != nil { + state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): writing", + node.Name, NicenessFmt(state.Nice), + ) + }) state.dirUnlock() return err } - state.Ctx.LogD("sp-start", sds, "starting workers") + state.Ctx.LogD("sp-startR-workers", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): starting workers", + node.Name, NicenessFmt(state.Nice), + ) + }) err = state.StartWorkers(conn, infosPayloads, payload) if err != nil { state.dirUnlock() - return err } return err } +func (state *SPState) closeFd(pth string) { + state.fdsLock.Lock() + if s, exists := state.fds[pth]; exists { + delete(state.fds, pth) + s.fd.Close() + } + state.fdsLock.Unlock() +} + func (state *SPState) StartWorkers( conn ConnDeadlined, infosPayloads [][]byte, - payload []byte) error { - sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.Nice))} + payload []byte, +) error { + les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}} + state.fds = make(map[string]FdAndFullSize) + state.fileHashers = make(map[string]*MTHAndOffset) + state.isDead = make(chan struct{}) + if state.maxOnlineTime > 0 { + state.mustFinishAt = state.started.Add(state.maxOnlineTime) + } + if !state.NoCK { + spCheckerOnce.Do(func() { go SPChecker(state.Ctx) }) + go func() { + for job := range state.Ctx.JobsNoCK(state.Node.Id) { + if job.PktEnc.Nice <= state.Nice { + spCheckerTasks <- SPCheckerTask{ + nodeId: state.Node.Id, + hsh: job.HshValue, + done: state.payloads, + } + } + } + }() + } + + // Remaining handshake payload sending if len(infosPayloads) > 1 { + state.wg.Add(1) go func() { for _, payload := range infosPayloads[1:] { state.Ctx.LogD( - "sp-work", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), - "queuing remaining payload", + "sp-queue-remaining", + append(les, LE{"Size", int64(len(payload))}), + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): queuing remaining payload (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + }, ) state.payloads <- payload } + state.wg.Done() }() } - state.Ctx.LogD( - "sp-work", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), - "processing first payload", - ) + + // Processing of first payload and queueing its responses + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): processing first payload (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + } + state.Ctx.LogD("sp-process", append(les, LE{"Size", int64(len(payload))}), logMsg) replies, err := state.ProcessSP(payload) if err != nil { - state.Ctx.LogE("sp-work", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-process", les, err, logMsg) return err } - + state.wg.Add(1) go func() { for _, reply := range replies { state.Ctx.LogD( - "sp-work", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}), - "queuing reply", + "sp-queue-reply", + append(les, LE{"Size", int64(len(reply))}), + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): queuing reply (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + }, ) state.payloads <- reply } + state.wg.Done() + }() + + // Periodic jobs + state.wg.Add(1) + go func() { + deadlineTicker := time.NewTicker(time.Second) + pingTicker := time.NewTicker(PingTimeout) + for { + select { + case <-state.isDead: + state.wg.Done() + deadlineTicker.Stop() + pingTicker.Stop() + return + case now := <-deadlineTicker.C: + if now.Sub(state.RxLastNonPing) >= state.onlineDeadline && + now.Sub(state.TxLastNonPing) >= state.onlineDeadline { + goto Deadlined + } + if state.maxOnlineTime > 0 && state.mustFinishAt.Before(now) { + goto Deadlined + } + if now.Sub(state.RxLastSeen) >= 2*PingTimeout { + goto Deadlined + } + break + Deadlined: + state.SetDead() + conn.Close() + case now := <-pingTicker.C: + if now.After(state.TxLastSeen.Add(PingTimeout)) { + state.wg.Add(1) + go func() { + state.pings <- struct{}{} + state.wg.Done() + }() + } + } + } }() + // Spool checker and INFOs sender of appearing files if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) { + state.wg.Add(1) go func() { - for range time.Tick(time.Second) { - if state.NotAlive() { + dw, err := state.Ctx.NewDirWatcher( + filepath.Join(state.Ctx.Spool, state.Node.Id.String(), string(TTx)), + time.Second, + ) + if err != nil { + state.Ctx.LogE("sp-queue-dir-watch", les, err, logMsg) + log.Fatalln(err) + } + for { + select { + case <-state.isDead: + dw.Close() + state.wg.Done() return - } - for _, payload := range state.Ctx.infosOur( - state.Node.Id, - state.Nice, - &state.infosOurSeen, - ) { - state.Ctx.LogD( - "sp-work", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), - "queuing new info", - ) - state.payloads <- payload + case <-dw.C: + for _, payload := range state.Ctx.infosOur( + state.Node.Id, + state.Nice, + &state.infosOurSeen, + ) { + state.Ctx.LogD( + "sp-queue-info", + append(les, LE{"Size", int64(len(payload))}), + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): queuing new info (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + }, + ) + state.payloads <- payload + } } } }() } + // Sender state.wg.Add(1) go func() { - defer func() { - state.isDead = true - state.wg.Done() - }() + defer conn.Close() + defer state.SetDead() + defer state.wg.Done() + buf := make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead) for { if state.NotAlive() { return } var payload []byte + var ping bool select { + case <-state.pings: + state.Ctx.LogD("sp-got-ping", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): got ping", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) + payload = SPPingMarshalized + ping = true case payload = <-state.payloads: state.Ctx.LogD( - "sp-xmit", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), - "got payload", + "sp-got-payload", + append(les, LE{"Size", int64(len(payload))}), + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): got payload (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + }, ) default: - } - if payload == nil { state.RLock() if len(state.queueTheir) == 0 { - state.Ctx.LogD("sp-xmit", sds, "file queue is empty") state.RUnlock() time.Sleep(100 * time.Millisecond) continue } freq := state.queueTheir[0].freq state.RUnlock() - if state.txRate > 0 { time.Sleep(time.Second / time.Duration(state.txRate)) } - - sdsp := SdsAdd(sds, SDS{ - "xx": string(TTx), - "hash": ToBase32(freq.Hash[:]), - "size": strconv.FormatInt(int64(freq.Offset), 10), + pktName := Base32Codec.EncodeToString(freq.Hash[:]) + lesp := append( + les, + LE{"XX", string(TTx)}, + LE{"Pkt", pktName}, + LE{"Size", int64(freq.Offset)}, + ) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): tx/%s (%s)", + state.Node.Name, NicenessFmt(state.Nice), + pktName, + humanize.IBytes(freq.Offset), + ) + } + state.Ctx.LogD("sp-queue", lesp, func(les LEs) string { + return logMsg(les) + ": queueing" }) - state.Ctx.LogD("sp-file", sdsp, "queueing") - fd, err := os.Open(filepath.Join( + pth := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TTx), - ToBase32(freq.Hash[:]), - )) - if err != nil { - state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") - break - } - fi, err := fd.Stat() - if err != nil { - state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") - break + Base32Codec.EncodeToString(freq.Hash[:]), + ) + state.fdsLock.RLock() + fdAndFullSize, exists := state.fds[pth] + state.fdsLock.RUnlock() + if !exists { + state.Ctx.LogD("sp-queue-open", lesp, func(les LEs) string { + return logMsg(les) + ": opening" + }) + fd, err := os.Open(pth) + if err != nil { + state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string { + return logMsg(les) + ": opening" + }) + return + } + fi, err := fd.Stat() + if err != nil { + state.Ctx.LogE("sp-queue-stat", lesp, err, func(les LEs) string { + return logMsg(les) + ": stating" + }) + return + } + fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()} + state.fdsLock.Lock() + state.fds[pth] = fdAndFullSize + state.fdsLock.Unlock() } - fullSize := uint64(fi.Size()) - var buf []byte - if freq.Offset < fullSize { - state.Ctx.LogD("sp-file", sdsp, "seeking") + fd := fdAndFullSize.fd + fullSize := fdAndFullSize.fullSize + lesp = append(lesp, LE{"FullSize", fullSize}) + var bufRead []byte + if freq.Offset < uint64(fullSize) { + state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string { + return logMsg(les) + ": seeking" + }) if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil { - state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") - break + state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string { + return logMsg(les) + ": seeking" + }) + return } - buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead) n, err := fd.Read(buf) if err != nil { - state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") - break + state.Ctx.LogE("sp-file-read", lesp, err, func(les LEs) string { + return logMsg(les) + ": reading" + }) + return } - buf = buf[:n] - state.Ctx.LogD( - "sp-file", - SdsAdd(sdsp, SDS{"size": strconv.Itoa(n)}), - "read", + bufRead = buf[:n] + lesp = append( + les, + LE{"XX", string(TTx)}, + LE{"Pkt", pktName}, + LE{"Size", int64(n)}, + LE{"FullSize", fullSize}, ) + state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string { + return fmt.Sprintf( + "%s: read %s", + logMsg(les), humanize.IBytes(uint64(n)), + ) + }) + } else { + state.closeFd(pth) } - fd.Close() payload = MarshalSP(SPTypeFile, SPFile{ Hash: freq.Hash, Offset: freq.Offset, - Payload: buf, + Payload: bufRead, }) - ourSize := freq.Offset + uint64(len(buf)) - sdsp["size"] = strconv.FormatInt(int64(ourSize), 10) - sdsp["fullsize"] = strconv.FormatInt(int64(fullSize), 10) - state.Ctx.LogP("sp-file", sdsp, "") + ourSize := freq.Offset + uint64(len(bufRead)) + lesp = append( + les, + LE{"XX", string(TTx)}, + LE{"Pkt", pktName}, + LE{"Size", int64(ourSize)}, + LE{"FullSize", fullSize}, + ) + if state.Ctx.ShowPrgrs { + state.progressBars[pktName] = struct{}{} + Progress("Tx", lesp) + } + if ourSize == uint64(fullSize) { + state.closeFd(pth) + state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string { + return logMsg(les) + ": finished" + }) + if state.Ctx.ShowPrgrs { + delete(state.progressBars, pktName) + } + } state.Lock() - if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash { - if ourSize == fullSize { - state.Ctx.LogD("sp-file", sdsp, "finished") - if len(state.queueTheir) > 1 { - state.queueTheir = state.queueTheir[1:] - } else { - state.queueTheir = state.queueTheir[:0] - } + for i, q := range state.queueTheir { + if *q.freq.Hash != *freq.Hash { + continue + } + if ourSize == uint64(fullSize) { + state.queueTheir = append( + state.queueTheir[:i], + state.queueTheir[i+1:]..., + ) } else { - state.queueTheir[0].freq.Offset += uint64(len(buf)) + q.freq.Offset = ourSize } - } else { - state.Ctx.LogD("sp-file", sdsp, "queue disappeared") + break } state.Unlock() } - state.Ctx.LogD( - "sp-xmit", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), - "sending", - ) - conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second)) - if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil { - state.Ctx.LogE("sp-xmit", SdsAdd(sds, SDS{"err": err}), "") - break + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): sending %s", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + } + state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg) + conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) + ct, err := state.csOur.Encrypt(nil, nil, payload) + if err != nil { + state.Ctx.LogE("sp-encrypting", les, err, logMsg) + return + } + if err := state.WriteSP(conn, ct, ping); err != nil { + state.Ctx.LogE("sp-sending", les, err, logMsg) + return } } }() + // Receiver state.wg.Add(1) go func() { - defer func() { - state.isDead = true - state.wg.Done() - }() for { if state.NotAlive() { - return + break } - state.Ctx.LogD("sp-recv", sds, "waiting for payload") - conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second)) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): waiting for payload", + state.Node.Name, NicenessFmt(state.Nice), + ) + } + state.Ctx.LogD("sp-recv-wait", les, logMsg) + conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) payload, err := state.ReadSP(conn) if err != nil { + if err == io.EOF { + break + } unmarshalErr := err.(*xdr.UnmarshalError) - netErr, ok := unmarshalErr.Err.(net.Error) - if ok && netErr.Timeout() { + if os.IsTimeout(unmarshalErr.Err) { continue } if unmarshalErr.ErrorCode == xdr.ErrIO { break } - state.Ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-recv-wait", les, err, logMsg) break } + logMsg = func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): payload (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + } state.Ctx.LogD( - "sp-recv", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), - "got payload", + "sp-recv-got", + append(les, LE{"Size", int64(len(payload))}), + func(les LEs) string { return logMsg(les) + ": got" }, ) payload, err = state.csTheir.Decrypt(nil, nil, payload) if err != nil { - state.Ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-recv-got", les, err, func(les LEs) string { + return logMsg(les) + ": got" + }) break } state.Ctx.LogD( - "sp-recv", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), - "processing", + "sp-recv-process", + append(les, LE{"Size", int64(len(payload))}), + func(les LEs) string { + return logMsg(les) + ": processing" + }, ) replies, err := state.ProcessSP(payload) if err != nil { - state.Ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-recv-process", les, err, func(les LEs) string { + return logMsg(les) + ": processing" + }) break } + state.wg.Add(1) go func() { for _, reply := range replies { state.Ctx.LogD( - "sp-recv", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}), - "queuing reply", + "sp-recv-reply", + append(les[:len(les)-1], LE{"Size", int64(len(reply))}), + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): queuing reply (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(reply))), + ) + }, ) state.payloads <- reply } + state.wg.Done() }() if state.rxRate > 0 { time.Sleep(time.Second / time.Duration(state.rxRate)) } } + state.SetDead() + state.wg.Done() + state.SetDead() + conn.Close() }() return nil @@ -730,8 +1106,10 @@ func (state *SPState) StartWorkers( func (state *SPState) Wait() { state.wg.Wait() - state.dirUnlock() + close(state.payloads) + close(state.pings) state.Duration = time.Now().Sub(state.started) + state.dirUnlock() state.RxSpeed = state.RxBytes state.TxSpeed = state.TxBytes rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds()) @@ -742,81 +1120,186 @@ func (state *SPState) Wait() { if txDuration > 0 { state.TxSpeed = state.TxBytes / txDuration } + for _, s := range state.fds { + s.fd.Close() + } + for pktName := range state.progressBars { + ProgressKill(pktName) + } } func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { - sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.Nice))} + les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}} r := bytes.NewReader(payload) var err error var replies [][]byte var infosGot bool for r.Len() > 0 { - state.Ctx.LogD("sp-process", sds, "unmarshaling header") + state.Ctx.LogD("sp-process-unmarshal", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling header", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) var head SPHead if _, err = xdr.Unmarshal(r, &head); err != nil { - state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-process-unmarshal", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling header", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) return nil, err } + if head.Type != SPTypePing { + state.RxLastNonPing = state.RxLastSeen + } switch head.Type { + case SPTypeHalt: + state.Ctx.LogD( + "sp-process-halt", + append(les, LE{"Type", "halt"}), func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): got HALT", + state.Node.Name, NicenessFmt(state.Nice), + ) + }, + ) + state.Lock() + state.queueTheir = nil + state.Unlock() + + case SPTypePing: + state.Ctx.LogD( + "sp-process-ping", + append(les, LE{"Type", "ping"}), + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): got PING", + state.Node.Name, NicenessFmt(state.Nice), + ) + }, + ) + case SPTypeInfo: infosGot = true - sdsp := SdsAdd(sds, SDS{"type": "info"}) - state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet") + lesp := append(les, LE{"Type", "info"}) + state.Ctx.LogD( + "sp-process-info-unmarshal", lesp, + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling INFO", + state.Node.Name, NicenessFmt(state.Nice), + ) + }, + ) var info SPInfo if _, err = xdr.Unmarshal(r, &info); err != nil { - state.Ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "") + state.Ctx.LogE( + "sp-process-info-unmarshal", lesp, err, + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling INFO", + state.Node.Name, NicenessFmt(state.Nice), + ) + }, + ) return nil, err } - sdsp = SdsAdd(sds, SDS{ - "hash": ToBase32(info.Hash[:]), - "size": strconv.FormatInt(int64(info.Size), 10), - "nice": strconv.Itoa(int(info.Nice)), - }) + pktName := Base32Codec.EncodeToString(info.Hash[:]) + lesp = append( + lesp, + LE{"Pkt", pktName}, + LE{"Size", int64(info.Size)}, + LE{"PktNice", int(info.Nice)}, + ) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): INFO %s (%s) nice %s", + state.Node.Name, NicenessFmt(state.Nice), + pktName, + humanize.IBytes(info.Size), + NicenessFmt(info.Nice), + ) + } if !state.listOnly && info.Nice > state.Nice { - state.Ctx.LogD("sp-process", sdsp, "too nice") + state.Ctx.LogD("sp-process-info-too-nice", lesp, func(les LEs) string { + return logMsg(les) + ": too nice" + }) continue } - state.Ctx.LogD("sp-process", sdsp, "received") + state.Ctx.LogD("sp-process-info-got", lesp, func(les LEs) string { + return logMsg(les) + ": received" + }) if !state.listOnly && state.xxOnly == TTx { continue } state.Lock() state.infosTheir[*info.Hash] = &info state.Unlock() - state.Ctx.LogD("sp-process", sdsp, "stating part") + state.Ctx.LogD("sp-process-info-stat", lesp, func(les LEs) string { + return logMsg(les) + ": stating part" + }) pktPath := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TRx), - ToBase32(info.Hash[:]), + Base32Codec.EncodeToString(info.Hash[:]), ) + logMsg = func(les LEs) string { + return fmt.Sprintf( + "Packet %s (%s) (nice %s)", + pktName, + humanize.IBytes(info.Size), + NicenessFmt(info.Nice), + ) + } if _, err = os.Stat(pktPath); err == nil { - state.Ctx.LogI("sp-info", sdsp, "already done") + state.Ctx.LogI("sp-info-done", lesp, func(les LEs) string { + return logMsg(les) + ": already done" + }) if !state.listOnly { replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash})) } continue } - if _, err = os.Stat(pktPath + SeenSuffix); err == nil { - state.Ctx.LogI("sp-info", sdsp, "already seen") + if _, err = os.Stat(filepath.Join( + state.Ctx.Spool, state.Node.Id.String(), string(TRx), + SeenDir, Base32Codec.EncodeToString(info.Hash[:]), + )); err == nil { + state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string { + return logMsg(les) + ": already seen" + }) if !state.listOnly { replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash})) } continue } + if _, err = os.Stat(pktPath + NoCKSuffix); err == nil { + state.Ctx.LogI("sp-info-nock", lesp, func(les LEs) string { + return logMsg(les) + ": still not checksummed" + }) + continue + } fi, err := os.Stat(pktPath + PartSuffix) var offset int64 if err == nil { offset = fi.Size() } if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) { - state.Ctx.LogI("sp-info", sdsp, "not enough space") + state.Ctx.LogI("sp-info-no-space", lesp, func(les LEs) string { + return logMsg(les) + ": not enough space" + }) continue } state.Ctx.LogI( "sp-info", - SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(offset, 10)}), - "", + append(lesp, LE{"Offset", offset}), + func(les LEs) string { + return fmt.Sprintf( + "%s: %d%%", logMsg(les), 100*uint64(offset)/info.Size, + ) + }, ) if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) { replies = append(replies, MarshalSP( @@ -824,128 +1307,318 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { SPFreq{info.Hash, uint64(offset)}, )) } + case SPTypeFile: - sdsp := SdsAdd(sds, SDS{"type": "file"}) - state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet") + lesp := append(les, LE{"Type", "file"}) + state.Ctx.LogD("sp-process-file", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling FILE", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) var file SPFile if _, err = xdr.Unmarshal(r, &file); err != nil { - state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{ - "err": err, - "type": "file", - }), "") + state.Ctx.LogE("sp-process-file", lesp, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling FILE", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) return nil, err } - sdsp["xx"] = string(TRx) - sdsp["hash"] = ToBase32(file.Hash[:]) - sdsp["size"] = strconv.Itoa(len(file.Payload)) - filePath := filepath.Join( + pktName := Base32Codec.EncodeToString(file.Hash[:]) + lesp = append( + lesp, + LE{"XX", string(TRx)}, + LE{"Pkt", pktName}, + LE{"Size", int64(len(file.Payload))}, + ) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "Got packet %s (%s)", + pktName, humanize.IBytes(uint64(len(file.Payload))), + ) + } + fullsize := int64(0) + state.RLock() + infoTheir := state.infosTheir[*file.Hash] + state.RUnlock() + if infoTheir == nil { + state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string { + return logMsg(les) + ": unknown file" + }) + continue + } + fullsize = int64(infoTheir.Size) + lesp = append(lesp, LE{"FullSize", fullsize}) + dirToSync := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TRx), - ToBase32(file.Hash[:]), - ) - state.Ctx.LogD("sp-file", sdsp, "opening part") - fd, err := os.OpenFile( - filePath+PartSuffix, - os.O_RDWR|os.O_CREATE, - os.FileMode(0666), ) - if err != nil { - state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") - return nil, err + filePath := filepath.Join(dirToSync, pktName) + filePathPart := filePath + PartSuffix + state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string { + return logMsg(les) + ": opening part" + }) + state.fdsLock.RLock() + fdAndFullSize, exists := state.fds[filePathPart] + state.fdsLock.RUnlock() + hasherAndOffset := state.fileHashers[filePath] + var fd *os.File + if exists { + fd = fdAndFullSize.fd + } else { + fd, err = os.OpenFile( + filePathPart, + os.O_RDWR|os.O_CREATE, + os.FileMode(0666), + ) + if err != nil { + state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string { + return logMsg(les) + ": opening part" + }) + return nil, err + } + state.fdsLock.Lock() + state.fds[filePathPart] = FdAndFullSize{fd: fd} + state.fdsLock.Unlock() + if !state.NoCK { + hasherAndOffset = &MTHAndOffset{ + mth: MTHNew(fullsize, int64(file.Offset)), + offset: file.Offset, + } + state.fileHashers[filePath] = hasherAndOffset + } } state.Ctx.LogD( - "sp-file", - SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(int64(file.Offset), 10)}), - "seeking", - ) + "sp-file-seek", + append(lesp, LE{"Offset", file.Offset}), + func(les LEs) string { + return fmt.Sprintf("%s: seeking %d", logMsg(les), file.Offset) + }) if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil { - state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") - fd.Close() + state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string { + return logMsg(les) + ": seeking" + }) + state.closeFd(filePathPart) return nil, err } - state.Ctx.LogD("sp-file", sdsp, "writing") - _, err = fd.Write(file.Payload) - if err != nil { - state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") - fd.Close() + state.Ctx.LogD("sp-file-write", lesp, func(les LEs) string { + return logMsg(les) + ": writing" + }) + if _, err = fd.Write(file.Payload); err != nil { + state.Ctx.LogE("sp-file-write", lesp, err, func(les LEs) string { + return logMsg(les) + ": writing" + }) + state.closeFd(filePathPart) return nil, err } - ourSize := uint64(file.Offset) + uint64(len(file.Payload)) - state.RLock() - sdsp["fullsize"] = strconv.FormatInt(int64(state.infosTheir[*file.Hash].Size), 10) - sdsp["size"] = strconv.FormatInt(int64(ourSize), 10) - state.Ctx.LogP("sp-file", sdsp, "") - if state.infosTheir[*file.Hash].Size != ourSize { - state.RUnlock() - fd.Close() + if hasherAndOffset != nil { + if hasherAndOffset.offset == file.Offset { + if _, err = hasherAndOffset.mth.Write(file.Payload); err != nil { + panic(err) + } + hasherAndOffset.offset += uint64(len(file.Payload)) + } else { + state.Ctx.LogE( + "sp-file-offset-differs", lesp, errors.New("offset differs"), + func(les LEs) string { + return logMsg(les) + ": deleting hasher" + }, + ) + delete(state.fileHashers, filePath) + hasherAndOffset = nil + } + } + ourSize := int64(file.Offset + uint64(len(file.Payload))) + lesp[len(lesp)-2].V = ourSize + if state.Ctx.ShowPrgrs { + state.progressBars[pktName] = struct{}{} + Progress("Rx", lesp) + } + if fullsize != ourSize { continue } - state.RUnlock() - spWorkersGroup.Wait() - spWorkersGroup.Add(1) + if state.Ctx.ShowPrgrs { + delete(state.progressBars, pktName) + } + logMsg = func(les LEs) string { + return fmt.Sprintf( + "Got packet %s %d%% (%s / %s)", + pktName, 100*ourSize/fullsize, + humanize.IBytes(uint64(ourSize)), + humanize.IBytes(uint64(fullsize)), + ) + } + err = fd.Sync() + if err != nil { + state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string { + return logMsg(les) + ": syncing" + }) + state.closeFd(filePathPart) + continue + } + if hasherAndOffset != nil { + delete(state.fileHashers, filePath) + if hasherAndOffset.mth.PreaddSize() == 0 { + if bytes.Compare(hasherAndOffset.mth.Sum(nil), file.Hash[:]) != 0 { + state.Ctx.LogE( + "sp-file-bad-checksum", lesp, + errors.New("checksum mismatch"), + logMsg, + ) + state.closeFd(filePathPart) + continue + } + if err = os.Rename(filePathPart, filePath); err != nil { + state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string { + return logMsg(les) + ": renaming" + }) + state.closeFd(filePathPart) + continue + } + if err = DirSync(dirToSync); err != nil { + state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string { + return logMsg(les) + ": dirsyncing" + }) + state.closeFd(filePathPart) + continue + } + state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string { + return logMsg(les) + ": done" + }) + state.wg.Add(1) + go func() { + state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash}) + state.wg.Done() + }() + state.Lock() + delete(state.infosTheir, *file.Hash) + state.Unlock() + if !state.Ctx.HdrUsage { + continue + } + if _, err = fd.Seek(0, io.SeekStart); err != nil { + state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string { + return logMsg(les) + ": seeking" + }) + state.closeFd(filePathPart) + continue + } + _, pktEncRaw, err := state.Ctx.HdrRead(fd) + state.closeFd(filePathPart) + if err != nil { + state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string { + return logMsg(les) + ": HdrReading" + }) + continue + } + state.Ctx.HdrWrite(pktEncRaw, filePath) + continue + } + } + state.closeFd(filePathPart) + if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil { + state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string { + return logMsg(les) + ": renaming" + }) + continue + } + if err = DirSync(dirToSync); err != nil { + state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string { + return logMsg(les) + ": dirsyncing" + }) + continue + } + state.Ctx.LogI("sp-file-downloaded", lesp, func(les LEs) string { + return logMsg(les) + ": downloaded" + }) + state.Lock() + delete(state.infosTheir, *file.Hash) + state.Unlock() go func() { - if err := fd.Sync(); err != nil { - state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "sync") - fd.Close() - return + t := SPCheckerTask{ + nodeId: state.Node.Id, + hsh: file.Hash, + done: state.payloads, } - state.wg.Add(1) - defer state.wg.Done() - fd.Seek(0, io.SeekStart) - state.Ctx.LogD("sp-file", sdsp, "checking") - gut, err := Check(fd, file.Hash[:]) - fd.Close() - if err != nil || !gut { - state.Ctx.LogE("sp-file", sdsp, "checksum mismatch") - return + if hasherAndOffset != nil { + t.mth = hasherAndOffset.mth } - state.Ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "") - os.Rename(filePath+PartSuffix, filePath) - state.Lock() - delete(state.infosTheir, *file.Hash) - state.Unlock() - spWorkersGroup.Done() - go func() { - state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash}) - }() + spCheckerTasks <- t }() + case SPTypeDone: - sdsp := SdsAdd(sds, SDS{"type": "done"}) - state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet") + lesp := append(les, LE{"Type", "done"}) + state.Ctx.LogD("sp-process-done-unmarshal", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling DONE", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) var done SPDone if _, err = xdr.Unmarshal(r, &done); err != nil { - state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{ - "type": "done", - "err": err, - }), "") + state.Ctx.LogE("sp-process-done-unmarshal", lesp, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling DONE", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) return nil, err } - sdsp["hash"] = ToBase32(done.Hash[:]) - state.Ctx.LogD("sp-done", sdsp, "removing") - err := os.Remove(filepath.Join( + pktName := Base32Codec.EncodeToString(done.Hash[:]) + lesp = append(lesp, LE{"Pkt", pktName}, LE{"XX", string(TTx)}) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): DONE: removing %s", + state.Node.Name, NicenessFmt(state.Nice), pktName, + ) + } + state.Ctx.LogD("sp-done", lesp, logMsg) + pth := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TTx), - ToBase32(done.Hash[:]), - )) - sdsp["xx"] = string(TTx) - if err == nil { - state.Ctx.LogI("sp-done", sdsp, "") + pktName, + ) + if err = os.Remove(pth); err == nil { + state.Ctx.LogI("sp-done", lesp, func(les LEs) string { + return fmt.Sprintf("Packet %s is sent", pktName) + }) + if state.Ctx.HdrUsage { + os.Remove(JobPath2Hdr(pth)) + } } else { - state.Ctx.LogE("sp-done", sdsp, "") + state.Ctx.LogE("sp-done", lesp, err, logMsg) } + case SPTypeFreq: - sdsp := SdsAdd(sds, SDS{"type": "freq"}) - state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet") + lesp := append(les, LE{"Type", "freq"}) + state.Ctx.LogD("sp-process-freq", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling FREQ", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) var freq SPFreq if _, err = xdr.Unmarshal(r, &freq); err != nil { - state.Ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "") + state.Ctx.LogE("sp-process-freq", lesp, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling FREQ", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) return nil, err } - sdsp["hash"] = ToBase32(freq.Hash[:]) - sdsp["offset"] = strconv.FormatInt(int64(freq.Offset), 10) - state.Ctx.LogD("sp-process", sdsp, "queueing") + pktName := Base32Codec.EncodeToString(freq.Hash[:]) + lesp = append(lesp, LE{"Pkt", pktName}, LE{"Offset", freq.Offset}) + state.Ctx.LogD("sp-process-freq-queueing", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): FREQ %s: queuing", + state.Node.Name, NicenessFmt(state.Nice), pktName, + ) + }) nice, exists := state.infosOurSeen[*freq.Hash] if exists { if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] { @@ -962,25 +1635,38 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice} state.Unlock() } else { - state.Ctx.LogD("sp-process", sdsp, "skipping") + state.Ctx.LogD("sp-process-freq-skip", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): FREQ %s: skipping", + state.Node.Name, NicenessFmt(state.Nice), pktName, + ) + }) } } else { - state.Ctx.LogD("sp-process", sdsp, "unknown") + state.Ctx.LogD("sp-process-freq-unknown", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): FREQ %s: unknown", + state.Node.Name, NicenessFmt(state.Nice), pktName, + ) + }) } - case SPTypeHalt: - state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "halt"}), "") - state.Lock() - state.queueTheir = nil - state.Unlock() + default: state.Ctx.LogE( - "sp-process", - SdsAdd(sds, SDS{"type": head.Type}), - "unknown", + "sp-process-type-unknown", + append(les, LE{"Type", head.Type}), + errors.New("unknown type"), + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): %d", + state.Node.Name, NicenessFmt(state.Nice), head.Type, + ) + }, ) return nil, BadPktType } } + if infosGot { var pkts int var size uint64 @@ -990,12 +1676,55 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { size += info.Size } state.RUnlock() - state.Ctx.LogI("sp-infos", SDS{ - "xx": string(TRx), - "node": state.Node.Id, - "pkts": strconv.Itoa(pkts), - "size": strconv.FormatInt(int64(size), 10), - }, "") + state.Ctx.LogI("sp-infos-rx", LEs{ + {"XX", string(TRx)}, + {"Node", state.Node.Id}, + {"Pkts", pkts}, + {"Size", int64(size)}, + }, func(les LEs) string { + return fmt.Sprintf( + "%s has got for us: %d packets, %s", + state.Node.Name, pkts, humanize.IBytes(size), + ) + }) } return payloadsSplit(replies), nil } + +func SPChecker(ctx *Ctx) { + for t := range spCheckerTasks { + pktName := Base32Codec.EncodeToString(t.hsh[:]) + les := LEs{ + {"XX", string(TRx)}, + {"Node", t.nodeId}, + {"Pkt", pktName}, + } + SPCheckerWg.Add(1) + ctx.LogD("sp-checker", les, func(les LEs) string { + return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(t.nodeId), pktName) + }) + size, err := ctx.CheckNoCK(t.nodeId, t.hsh, t.mth) + les = append(les, LE{"Size", size}) + if err != nil { + ctx.LogE("sp-checker", les, err, func(les LEs) string { + return fmt.Sprintf( + "Checksumming %s/rx/%s (%s)", ctx.NodeName(t.nodeId), pktName, + humanize.IBytes(uint64(size)), + ) + }) + SPCheckerWg.Done() + continue + } + ctx.LogI("sp-checker-done", les, func(les LEs) string { + return fmt.Sprintf( + "Packet %s is retreived (%s)", + pktName, humanize.IBytes(uint64(size)), + ) + }) + SPCheckerWg.Done() + go func(t SPCheckerTask) { + defer func() { recover() }() + t.done <- MarshalSP(SPTypeDone, SPDone{t.hsh}) + }(t) + } +}