X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Fsp.go;h=94b37ff624893f8560a7d4b0f1f0f3ed827c4eff;hb=4e08a1c97600e0372680e86a651f916c70e89342;hp=540e0e52a22a4f016114ca8a9fed71599802ed54;hpb=2ac16acad8de20005793cbadd61068d38dc3b0ca;p=nncp.git diff --git a/src/sp.go b/src/sp.go index 540e0e5..94b37ff 100644 --- a/src/sp.go +++ b/src/sp.go @@ -1,6 +1,6 @@ /* NNCP -- Node to Node copy, utilities for store-and-forward data exchange -Copyright (C) 2016-2019 Sergey Matveev +Copyright (C) 2016-2020 Sergey Matveev This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -22,11 +22,9 @@ import ( "crypto/subtle" "errors" "io" - "net" "os" "path/filepath" "sort" - "strconv" "sync" "time" @@ -35,19 +33,19 @@ import ( ) const ( - MaxSPSize = 1<<16 - 256 - PartSuffix = ".part" - DefaultDeadline = 10 + MaxSPSize = 1<<16 - 256 + PartSuffix = ".part" + SPHeadOverhead = 4 ) var ( MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1} - SPHeadOverhead int SPInfoOverhead int SPFreqOverhead int SPFileOverhead int SPHaltMarshalized []byte + SPPingMarshalized []byte NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite( noise.DH25519, @@ -55,6 +53,9 @@ var ( noise.HashBLAKE2b, ) + DefaultDeadline = 10 * time.Second + PingTimeout = time.Minute + spWorkersGroup sync.WaitGroup ) @@ -66,6 +67,7 @@ const ( SPTypeFile SPType = iota SPTypeDone SPType = iota SPTypeHalt SPType = iota + SPTypePing SPType = iota ) type SPHead struct { @@ -115,8 +117,16 @@ func init() { if _, err := xdr.Marshal(&buf, spHead); err != nil { panic(err) } + SPHaltMarshalized = make([]byte, SPHeadOverhead) copy(SPHaltMarshalized, buf.Bytes()) - SPHeadOverhead = buf.Len() + buf.Reset() + + spHead = SPHead{Type: SPTypePing} + if _, err := xdr.Marshal(&buf, spHead); err != nil { + panic(err) + } + SPPingMarshalized = make([]byte, SPHeadOverhead) + copy(SPPingMarshalized, buf.Bytes()) buf.Reset() spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)} @@ -142,11 +152,10 @@ func init() { func MarshalSP(typ SPType, sp interface{}) []byte { var buf bytes.Buffer - var err error - if _, err = xdr.Marshal(&buf, SPHead{typ}); err != nil { + if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil { panic(err) } - if _, err = xdr.Marshal(&buf, sp); err != nil { + if _, err := xdr.Marshal(&buf, sp); err != nil { panic(err) } return buf.Bytes() @@ -172,21 +181,25 @@ type SPState struct { Ctx *Ctx Node *Node Nice uint8 - onlineDeadline uint - maxOnlineTime uint + onlineDeadline time.Duration + maxOnlineTime time.Duration hs *noise.HandshakeState csOur *noise.CipherState csTheir *noise.CipherState payloads chan []byte + pings chan struct{} infosTheir map[[32]byte]*SPInfo infosOurSeen map[[32]byte]uint8 queueTheir []*FreqWithNice wg sync.WaitGroup RxBytes int64 RxLastSeen time.Time + RxLastNonPing time.Time TxBytes int64 TxLastSeen time.Time + TxLastNonPing time.Time started time.Time + mustFinishAt time.Time Duration time.Duration RxSpeed int64 TxSpeed int64 @@ -195,22 +208,40 @@ type SPState struct { xxOnly TRxTx rxRate int txRate int - isDead bool + isDead chan struct{} listOnly bool onlyPkts map[[32]byte]bool + writeSPBuf bytes.Buffer sync.RWMutex } -func (state *SPState) NotAlive() bool { - if state.isDead { - return true +func (state *SPState) SetDead() { + state.Lock() + defer state.Unlock() + select { + case <-state.isDead: + // Already closed channel, dead + return + default: } - now := time.Now() - if state.maxOnlineTime > 0 && state.started.Add(time.Duration(state.maxOnlineTime)*time.Second).Before(now) { + close(state.isDead) + go func() { + for _ = range state.payloads { + } + }() + go func() { + for _ = range state.pings { + } + }() +} + +func (state *SPState) NotAlive() bool { + select { + case <-state.isDead: return true + default: } - return uint(now.Sub(state.RxLastSeen).Seconds()) >= state.onlineDeadline && - uint(now.Sub(state.TxLastSeen).Seconds()) >= state.onlineDeadline + return false } func (state *SPState) dirUnlock() { @@ -218,11 +249,21 @@ func (state *SPState) dirUnlock() { state.Ctx.UnlockDir(state.txLock) } -func (state *SPState) WriteSP(dst io.Writer, payload []byte) error { - n, err := xdr.Marshal(dst, SPRaw{Magic: MagicNNCPLv1, Payload: payload}) - if err == nil { +func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error { + state.writeSPBuf.Reset() + n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{ + Magic: MagicNNCPLv1, + Payload: payload, + }) + if err != nil { + return err + } + if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil { state.TxLastSeen = time.Now() state.TxBytes += int64(n) + if !ping { + state.TxLastNonPing = state.TxLastSeen + } } return err } @@ -231,6 +272,10 @@ func (state *SPState) ReadSP(src io.Reader) ([]byte, error) { var sp SPRaw n, err := xdr.UnmarshalLimited(src, &sp, 1<<17) if err != nil { + ue := err.(*xdr.UnmarshalError) + if ue.Err == io.EOF { + return nil, ue.Err + } return nil, err } state.RxLastSeen = time.Now() @@ -245,7 +290,7 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [ var infos []*SPInfo var totalSize int64 for job := range ctx.Jobs(nodeId, TTx) { - job.Fd.Close() + job.Fd.Close() // #nosec G104 if job.PktEnc.Nice > nice { continue } @@ -266,16 +311,16 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [ payloads = append(payloads, MarshalSP(SPTypeInfo, info)) ctx.LogD("sp-info-our", SDS{ "node": nodeId, - "name": ToBase32(info.Hash[:]), - "size": strconv.FormatInt(int64(info.Size), 10), + "name": Base32Codec.EncodeToString(info.Hash[:]), + "size": info.Size, }, "") } if totalSize > 0 { ctx.LogI("sp-infos", SDS{ "xx": string(TTx), "node": nodeId, - "pkts": strconv.Itoa(len(payloads)), - "size": strconv.FormatInt(totalSize, 10), + "pkts": len(payloads), + "size": totalSize, }, "") } return payloadsSplit(payloads) @@ -289,14 +334,14 @@ func (state *SPState) StartI(conn ConnDeadlined) error { } var rxLock *os.File if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) { - rxLock, err = state.Ctx.LockDir(nodeId, TRx) + rxLock, err = state.Ctx.LockDir(nodeId, string(TRx)) if err != nil { return err } } var txLock *os.File if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) { - txLock, err = state.Ctx.LockDir(nodeId, TTx) + txLock, err = state.Ctx.LockDir(nodeId, string(TTx)) if err != nil { return err } @@ -318,6 +363,7 @@ func (state *SPState) StartI(conn ConnDeadlined) error { } state.hs = hs state.payloads = make(chan []byte) + state.pings = make(chan struct{}) state.infosTheir = make(map[[32]byte]*SPInfo) state.infosOurSeen = make(map[[32]byte]uint8) state.started = started @@ -344,33 +390,32 @@ func (state *SPState) StartI(conn ConnDeadlined) error { state.dirUnlock() return err } - sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(state.Nice))} + sds := SDS{"node": nodeId, "nice": int(state.Nice)} state.Ctx.LogD("sp-start", sds, "sending first message") - conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second)) - if err = state.WriteSP(conn, buf); err != nil { - state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") + conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 + if err = state.WriteSP(conn, buf, false); err != nil { + state.Ctx.LogE("sp-start", sds, err, "") state.dirUnlock() return err } state.Ctx.LogD("sp-start", sds, "waiting for first message") - conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second)) + conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if buf, err = state.ReadSP(conn); err != nil { - state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-start", sds, err, "") state.dirUnlock() return err } payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf) if err != nil { - state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-start", sds, err, "") state.dirUnlock() return err } state.Ctx.LogD("sp-start", sds, "starting workers") err = state.StartWorkers(conn, infosPayloads, payload) if err != nil { - state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-start", sds, err, "") state.dirUnlock() - return err } return err } @@ -393,36 +438,34 @@ func (state *SPState) StartR(conn ConnDeadlined) error { xxOnly := TRxTx("") state.hs = hs state.payloads = make(chan []byte) + state.pings = make(chan struct{}) state.infosOurSeen = make(map[[32]byte]uint8) state.infosTheir = make(map[[32]byte]*SPInfo) state.started = started state.xxOnly = xxOnly var buf []byte var payload []byte - state.Ctx.LogD( - "sp-start", - SDS{"nice": strconv.Itoa(int(state.Nice))}, - "waiting for first message", - ) - conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second)) + state.Ctx.LogD("sp-start", SDS{"nice": int(state.Nice)}, "waiting for first message") + conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if buf, err = state.ReadSP(conn); err != nil { - state.Ctx.LogE("sp-start", SDS{"err": err}, "") + state.Ctx.LogE("sp-start", SDS{}, err, "") return err } if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil { - state.Ctx.LogE("sp-start", SDS{"err": err}, "") + state.Ctx.LogE("sp-start", SDS{}, err, "") return err } var node *Node - for _, node = range state.Ctx.Neigh { - if subtle.ConstantTimeCompare(state.hs.PeerStatic(), node.NoisePub[:]) == 1 { + for _, n := range state.Ctx.Neigh { + if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 { + node = n break } } if node == nil { - peerId := ToBase32(state.hs.PeerStatic()) - state.Ctx.LogE("sp-start", SDS{"peer": peerId}, "unknown") + peerId := Base32Codec.EncodeToString(state.hs.PeerStatic()) + state.Ctx.LogE("sp-start", SDS{"peer": peerId}, errors.New("unknown"), "") return errors.New("Unknown peer: " + peerId) } state.Node = node @@ -430,14 +473,14 @@ func (state *SPState) StartR(conn ConnDeadlined) error { state.txRate = node.TxRate state.onlineDeadline = node.OnlineDeadline state.maxOnlineTime = node.MaxOnlineTime - sds := SDS{"node": node.Id, "nice": strconv.Itoa(int(state.Nice))} + sds := SDS{"node": node.Id, "nice": int(state.Nice)} - if state.Ctx.ensureRxDir(node.Id); err != nil { + if err = state.Ctx.ensureRxDir(node.Id); err != nil { return err } var rxLock *os.File if xxOnly == "" || xxOnly == TRx { - rxLock, err = state.Ctx.LockDir(node.Id, TRx) + rxLock, err = state.Ctx.LockDir(node.Id, string(TRx)) if err != nil { return err } @@ -445,7 +488,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error { state.rxLock = rxLock var txLock *os.File if xxOnly == "" || xxOnly == TTx { - txLock, err = state.Ctx.LockDir(node.Id, TTx) + txLock, err = state.Ctx.LockDir(node.Id, string(TTx)) if err != nil { return err } @@ -471,9 +514,9 @@ func (state *SPState) StartR(conn ConnDeadlined) error { state.dirUnlock() return err } - conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second)) - if err = state.WriteSP(conn, buf); err != nil { - state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") + conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 + if err = state.WriteSP(conn, buf, false); err != nil { + state.Ctx.LogE("sp-start", sds, err, "") state.dirUnlock() return err } @@ -481,7 +524,6 @@ func (state *SPState) StartR(conn ConnDeadlined) error { err = state.StartWorkers(conn, infosPayloads, payload) if err != nil { state.dirUnlock() - return err } return err } @@ -489,154 +531,203 @@ func (state *SPState) StartR(conn ConnDeadlined) error { func (state *SPState) StartWorkers( conn ConnDeadlined, infosPayloads [][]byte, - payload []byte) error { - sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.Nice))} + payload []byte, +) error { + sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)} + state.isDead = make(chan struct{}) + if state.maxOnlineTime > 0 { + state.mustFinishAt = state.started.Add(state.maxOnlineTime) + } + + // Remaining handshake payload sending if len(infosPayloads) > 1 { + state.wg.Add(1) go func() { for _, payload := range infosPayloads[1:] { state.Ctx.LogD( "sp-work", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), + SdsAdd(sds, SDS{"size": len(payload)}), "queuing remaining payload", ) state.payloads <- payload } + state.wg.Done() }() } + + // Processing of first payload and queueing its responses state.Ctx.LogD( "sp-work", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), + SdsAdd(sds, SDS{"size": len(payload)}), "processing first payload", ) replies, err := state.ProcessSP(payload) if err != nil { - state.Ctx.LogE("sp-work", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-work", sds, err, "") return err } - + state.wg.Add(1) go func() { for _, reply := range replies { state.Ctx.LogD( "sp-work", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}), + SdsAdd(sds, SDS{"size": len(reply)}), "queuing reply", ) state.payloads <- reply } + state.wg.Done() }() + // Periodic jobs + state.wg.Add(1) + go func() { + deadlineTicker := time.NewTicker(time.Second) + pingTicker := time.NewTicker(PingTimeout) + for { + select { + case <-state.isDead: + state.wg.Done() + deadlineTicker.Stop() + pingTicker.Stop() + return + case now := <-deadlineTicker.C: + if (now.Sub(state.RxLastNonPing) >= state.onlineDeadline && + now.Sub(state.TxLastNonPing) >= state.onlineDeadline) || + (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) || + (now.Sub(state.RxLastSeen) >= 2*PingTimeout) { + state.SetDead() + conn.Close() // #nosec G104 + } + case now := <-pingTicker.C: + if now.After(state.TxLastSeen.Add(PingTimeout)) { + state.wg.Add(1) + go func() { + state.pings <- struct{}{} + state.wg.Done() + }() + } + } + } + }() + + // Spool checker and INFOs sender of appearing files if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) { + state.wg.Add(1) go func() { - for range time.Tick(time.Second) { - if state.NotAlive() { + ticker := time.NewTicker(time.Second) + for { + select { + case <-state.isDead: + state.wg.Done() + ticker.Stop() return - } - for _, payload := range state.Ctx.infosOur( - state.Node.Id, - state.Nice, - &state.infosOurSeen, - ) { - state.Ctx.LogD( - "sp-work", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), - "queuing new info", - ) - state.payloads <- payload + case <-ticker.C: + for _, payload := range state.Ctx.infosOur( + state.Node.Id, + state.Nice, + &state.infosOurSeen, + ) { + state.Ctx.LogD( + "sp-work", + SdsAdd(sds, SDS{"size": len(payload)}), + "queuing new info", + ) + state.payloads <- payload + } } } }() } + // Sender state.wg.Add(1) go func() { - defer func() { - state.isDead = true - state.wg.Done() - }() + defer conn.Close() + defer state.SetDead() + defer state.wg.Done() for { if state.NotAlive() { return } var payload []byte + var ping bool select { + case <-state.pings: + state.Ctx.LogD("sp-xmit", sds, "got ping") + payload = SPPingMarshalized + ping = true case payload = <-state.payloads: state.Ctx.LogD( "sp-xmit", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), + SdsAdd(sds, SDS{"size": len(payload)}), "got payload", ) default: - } - if payload == nil { state.RLock() if len(state.queueTheir) == 0 { - state.Ctx.LogD("sp-xmit", sds, "file queue is empty") state.RUnlock() time.Sleep(100 * time.Millisecond) continue } freq := state.queueTheir[0].freq state.RUnlock() - if state.txRate > 0 { time.Sleep(time.Second / time.Duration(state.txRate)) } - sdsp := SdsAdd(sds, SDS{ "xx": string(TTx), - "hash": ToBase32(freq.Hash[:]), - "size": strconv.FormatInt(int64(freq.Offset), 10), + "pkt": Base32Codec.EncodeToString(freq.Hash[:]), + "size": int64(freq.Offset), }) state.Ctx.LogD("sp-file", sdsp, "queueing") fd, err := os.Open(filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TTx), - ToBase32(freq.Hash[:]), + Base32Codec.EncodeToString(freq.Hash[:]), )) if err != nil { - state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") - break + state.Ctx.LogE("sp-file", sdsp, err, "") + return } fi, err := fd.Stat() if err != nil { - state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") - break + state.Ctx.LogE("sp-file", sdsp, err, "") + return } - fullSize := uint64(fi.Size()) + fullSize := fi.Size() var buf []byte - if freq.Offset < fullSize { + if freq.Offset < uint64(fullSize) { state.Ctx.LogD("sp-file", sdsp, "seeking") if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil { - state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") - break + state.Ctx.LogE("sp-file", sdsp, err, "") + return } buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead) n, err := fd.Read(buf) if err != nil { - state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") - break + state.Ctx.LogE("sp-file", sdsp, err, "") + return } buf = buf[:n] - state.Ctx.LogD( - "sp-file", - SdsAdd(sdsp, SDS{"size": strconv.Itoa(n)}), - "read", - ) + state.Ctx.LogD("sp-file", SdsAdd(sdsp, SDS{"size": n}), "read") } - fd.Close() + fd.Close() // #nosec G104 payload = MarshalSP(SPTypeFile, SPFile{ Hash: freq.Hash, Offset: freq.Offset, Payload: buf, }) ourSize := freq.Offset + uint64(len(buf)) - sdsp["size"] = strconv.FormatInt(int64(ourSize), 10) - sdsp["fullsize"] = strconv.FormatInt(int64(fullSize), 10) - state.Ctx.LogP("sp-file", sdsp, "") + sdsp["size"] = int64(ourSize) + sdsp["fullsize"] = fullSize + if state.Ctx.ShowPrgrs { + Progress("Tx", sdsp) + } state.Lock() if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash { - if ourSize == fullSize { + if ourSize == uint64(fullSize) { state.Ctx.LogD("sp-file", sdsp, "finished") if len(state.queueTheir) > 1 { state.queueTheir = state.queueTheir[1:] @@ -651,78 +742,79 @@ func (state *SPState) StartWorkers( } state.Unlock() } - state.Ctx.LogD( - "sp-xmit", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), - "sending", - ) - conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second)) - if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil { - state.Ctx.LogE("sp-xmit", SdsAdd(sds, SDS{"err": err}), "") - break + state.Ctx.LogD("sp-xmit", SdsAdd(sds, SDS{"size": len(payload)}), "sending") + conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 + if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil { + state.Ctx.LogE("sp-xmit", sds, err, "") + return } } }() + // Receiver state.wg.Add(1) go func() { - defer func() { - state.isDead = true - state.wg.Done() - }() for { if state.NotAlive() { - return + break } state.Ctx.LogD("sp-recv", sds, "waiting for payload") - conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second)) + conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 payload, err := state.ReadSP(conn) if err != nil { + if err == io.EOF { + break + } unmarshalErr := err.(*xdr.UnmarshalError) - netErr, ok := unmarshalErr.Err.(net.Error) - if ok && netErr.Timeout() { + if os.IsTimeout(unmarshalErr.Err) { continue } if unmarshalErr.ErrorCode == xdr.ErrIO { break } - state.Ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-recv", sds, err, "") break } state.Ctx.LogD( "sp-recv", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), + SdsAdd(sds, SDS{"size": len(payload)}), "got payload", ) payload, err = state.csTheir.Decrypt(nil, nil, payload) if err != nil { - state.Ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-recv", sds, err, "") break } state.Ctx.LogD( "sp-recv", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), + SdsAdd(sds, SDS{"size": len(payload)}), "processing", ) replies, err := state.ProcessSP(payload) if err != nil { - state.Ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-recv", sds, err, "") break } + state.wg.Add(1) go func() { for _, reply := range replies { state.Ctx.LogD( "sp-recv", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}), + SdsAdd(sds, SDS{"size": len(reply)}), "queuing reply", ) state.payloads <- reply } + state.wg.Done() }() if state.rxRate > 0 { time.Sleep(time.Second / time.Duration(state.rxRate)) } } + state.SetDead() + state.wg.Done() + state.SetDead() + conn.Close() // #nosec G104 }() return nil @@ -730,6 +822,8 @@ func (state *SPState) StartWorkers( func (state *SPState) Wait() { state.wg.Wait() + close(state.payloads) + close(state.pings) state.dirUnlock() state.Duration = time.Now().Sub(state.started) state.RxSpeed = state.RxBytes @@ -745,7 +839,7 @@ func (state *SPState) Wait() { } func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { - sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.Nice))} + sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)} r := bytes.NewReader(payload) var err error var replies [][]byte @@ -754,23 +848,33 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.Ctx.LogD("sp-process", sds, "unmarshaling header") var head SPHead if _, err = xdr.Unmarshal(r, &head); err != nil { - state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-process", sds, err, "") return nil, err } + if head.Type != SPTypePing { + state.RxLastNonPing = state.RxLastSeen + } switch head.Type { + case SPTypeHalt: + state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "halt"}), "") + state.Lock() + state.queueTheir = nil + state.Unlock() + case SPTypePing: + state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "ping"}), "") case SPTypeInfo: infosGot = true sdsp := SdsAdd(sds, SDS{"type": "info"}) state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet") var info SPInfo if _, err = xdr.Unmarshal(r, &info); err != nil { - state.Ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "") + state.Ctx.LogE("sp-process", sdsp, err, "") return nil, err } sdsp = SdsAdd(sds, SDS{ - "hash": ToBase32(info.Hash[:]), - "size": strconv.FormatInt(int64(info.Size), 10), - "nice": strconv.Itoa(int(info.Nice)), + "pkt": Base32Codec.EncodeToString(info.Hash[:]), + "size": int64(info.Size), + "nice": int(info.Nice), }) if !state.listOnly && info.Nice > state.Nice { state.Ctx.LogD("sp-process", sdsp, "too nice") @@ -788,7 +892,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.Ctx.Spool, state.Node.Id.String(), string(TRx), - ToBase32(info.Hash[:]), + Base32Codec.EncodeToString(info.Hash[:]), ) if _, err = os.Stat(pktPath); err == nil { state.Ctx.LogI("sp-info", sdsp, "already done") @@ -815,7 +919,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } state.Ctx.LogI( "sp-info", - SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(offset, 10)}), + SdsAdd(sdsp, SDS{"offset": offset}), "", ) if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) { @@ -829,21 +933,18 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet") var file SPFile if _, err = xdr.Unmarshal(r, &file); err != nil { - state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{ - "err": err, - "type": "file", - }), "") + state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "file"}), err, "") return nil, err } sdsp["xx"] = string(TRx) - sdsp["hash"] = ToBase32(file.Hash[:]) - sdsp["size"] = strconv.Itoa(len(file.Payload)) - filePath := filepath.Join( + sdsp["pkt"] = Base32Codec.EncodeToString(file.Hash[:]) + sdsp["size"] = len(file.Payload) + dirToSync := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TRx), - ToBase32(file.Hash[:]), ) + filePath := filepath.Join(dirToSync, Base32Codec.EncodeToString(file.Hash[:])) state.Ctx.LogD("sp-file", sdsp, "opening part") fd, err := os.OpenFile( filePath+PartSuffix, @@ -851,63 +952,82 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { os.FileMode(0666), ) if err != nil { - state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") + state.Ctx.LogE("sp-file", sdsp, err, "") return nil, err } state.Ctx.LogD( "sp-file", - SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(int64(file.Offset), 10)}), + SdsAdd(sdsp, SDS{"offset": file.Offset}), "seeking", ) if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil { - state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") - fd.Close() + state.Ctx.LogE("sp-file", sdsp, err, "") + fd.Close() // #nosec G104 return nil, err } state.Ctx.LogD("sp-file", sdsp, "writing") _, err = fd.Write(file.Payload) if err != nil { - state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") - fd.Close() + state.Ctx.LogE("sp-file", sdsp, err, "") + fd.Close() // #nosec G104 return nil, err } - ourSize := uint64(file.Offset) + uint64(len(file.Payload)) + ourSize := int64(file.Offset + uint64(len(file.Payload))) + sdsp["size"] = ourSize + fullsize := int64(0) state.RLock() - sdsp["fullsize"] = strconv.FormatInt(int64(state.infosTheir[*file.Hash].Size), 10) - sdsp["size"] = strconv.FormatInt(int64(ourSize), 10) - state.Ctx.LogP("sp-file", sdsp, "") - if state.infosTheir[*file.Hash].Size != ourSize { - state.RUnlock() - fd.Close() + infoTheir, ok := state.infosTheir[*file.Hash] + state.RUnlock() + if ok { + fullsize = int64(infoTheir.Size) + } + sdsp["fullsize"] = fullsize + if state.Ctx.ShowPrgrs { + Progress("Rx", sdsp) + } + if fullsize != ourSize { + fd.Close() // #nosec G104 continue } - state.RUnlock() spWorkersGroup.Wait() spWorkersGroup.Add(1) go func() { if err := fd.Sync(); err != nil { - state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "sync") - fd.Close() + state.Ctx.LogE("sp-file", sdsp, err, "sync") + fd.Close() // #nosec G104 return } state.wg.Add(1) defer state.wg.Done() - fd.Seek(0, io.SeekStart) + if _, err = fd.Seek(0, io.SeekStart); err != nil { + fd.Close() // #nosec G104 + state.Ctx.LogE("sp-file", sdsp, err, "") + return + } state.Ctx.LogD("sp-file", sdsp, "checking") - gut, err := Check(fd, file.Hash[:]) - fd.Close() + gut, err := Check(fd, file.Hash[:], sdsp, state.Ctx.ShowPrgrs) + fd.Close() // #nosec G104 if err != nil || !gut { - state.Ctx.LogE("sp-file", sdsp, "checksum mismatch") + state.Ctx.LogE("sp-file", sdsp, errors.New("checksum mismatch"), "") return } state.Ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "") - os.Rename(filePath+PartSuffix, filePath) + if err = os.Rename(filePath+PartSuffix, filePath); err != nil { + state.Ctx.LogE("sp-file", sdsp, err, "rename") + return + } + if err = DirSync(dirToSync); err != nil { + state.Ctx.LogE("sp-file", sdsp, err, "sync") + return + } state.Lock() delete(state.infosTheir, *file.Hash) state.Unlock() spWorkersGroup.Done() + state.wg.Add(1) go func() { state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash}) + state.wg.Done() }() }() case SPTypeDone: @@ -915,36 +1035,33 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet") var done SPDone if _, err = xdr.Unmarshal(r, &done); err != nil { - state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{ - "type": "done", - "err": err, - }), "") + state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "done"}), err, "") return nil, err } - sdsp["hash"] = ToBase32(done.Hash[:]) + sdsp["pkt"] = Base32Codec.EncodeToString(done.Hash[:]) state.Ctx.LogD("sp-done", sdsp, "removing") err := os.Remove(filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TTx), - ToBase32(done.Hash[:]), + Base32Codec.EncodeToString(done.Hash[:]), )) sdsp["xx"] = string(TTx) if err == nil { state.Ctx.LogI("sp-done", sdsp, "") } else { - state.Ctx.LogE("sp-done", sdsp, "") + state.Ctx.LogE("sp-done", sdsp, err, "") } case SPTypeFreq: sdsp := SdsAdd(sds, SDS{"type": "freq"}) state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet") var freq SPFreq if _, err = xdr.Unmarshal(r, &freq); err != nil { - state.Ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "") + state.Ctx.LogE("sp-process", sdsp, err, "") return nil, err } - sdsp["hash"] = ToBase32(freq.Hash[:]) - sdsp["offset"] = strconv.FormatInt(int64(freq.Offset), 10) + sdsp["pkt"] = Base32Codec.EncodeToString(freq.Hash[:]) + sdsp["offset"] = freq.Offset state.Ctx.LogD("sp-process", sdsp, "queueing") nice, exists := state.infosOurSeen[*freq.Hash] if exists { @@ -967,16 +1084,12 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } else { state.Ctx.LogD("sp-process", sdsp, "unknown") } - case SPTypeHalt: - state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "halt"}), "") - state.Lock() - state.queueTheir = nil - state.Unlock() default: state.Ctx.LogE( "sp-process", SdsAdd(sds, SDS{"type": head.Type}), - "unknown", + errors.New("unknown type"), + "", ) return nil, BadPktType } @@ -993,8 +1106,8 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.Ctx.LogI("sp-infos", SDS{ "xx": string(TRx), "node": state.Node.Id, - "pkts": strconv.Itoa(pkts), - "size": strconv.FormatInt(int64(size), 10), + "pkts": pkts, + "size": int64(size), }, "") } return payloadsSplit(replies), nil