X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Fcypherpunks.ru%2Fnncp%2Fsp.go;h=7e2f7c3be104029977f94fa6e1cdc7b52aa89273;hb=dd92823db3d72fb21a4c712a7fb052dce16443dd;hp=6daab72c643c4eb3f7df571c0c205da48abdf74d;hpb=38c5c9cdfcb75e3a80d0c22d3991f62dad022f1e;p=nncp.git diff --git a/src/cypherpunks.ru/nncp/sp.go b/src/cypherpunks.ru/nncp/sp.go index 6daab72..7e2f7c3 100644 --- a/src/cypherpunks.ru/nncp/sp.go +++ b/src/cypherpunks.ru/nncp/sp.go @@ -1,11 +1,10 @@ /* NNCP -- Node to Node copy, utilities for store-and-forward data exchange -Copyright (C) 2016-2017 Sergey Matveev +Copyright (C) 2016-2019 Sergey Matveev This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. +the Free Software Foundation, version 3 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of @@ -99,6 +98,17 @@ type SPRaw struct { Payload []byte } +type FreqWithNice struct { + freq *SPFreq + nice uint8 +} + +type ConnDeadlined interface { + io.ReadWriter + SetReadDeadline(t time.Time) error + SetWriteDeadline(t time.Time) error +} + func init() { var buf bytes.Buffer spHead := SPHead{Type: SPTypeHalt} @@ -159,18 +169,18 @@ func payloadsSplit(payloads [][]byte) [][]byte { } type SPState struct { - ctx *Ctx + Ctx *Ctx Node *Node + Nice uint8 onlineDeadline uint maxOnlineTime uint - nice uint8 hs *noise.HandshakeState csOur *noise.CipherState csTheir *noise.CipherState payloads chan []byte infosTheir map[[32]byte]*SPInfo - infosOurSeen map[[32]byte]struct{} - queueTheir []*SPFreq + infosOurSeen map[[32]byte]uint8 + queueTheir []*FreqWithNice wg sync.WaitGroup RxBytes int64 RxLastSeen time.Time @@ -183,7 +193,11 @@ type SPState struct { rxLock *os.File txLock *os.File xxOnly TRxTx + rxRate int + txRate int isDead bool + listOnly bool + onlyPkts map[[32]byte]bool sync.RWMutex } @@ -195,12 +209,13 @@ func (state *SPState) NotAlive() bool { if state.maxOnlineTime > 0 && state.started.Add(time.Duration(state.maxOnlineTime)*time.Second).Before(now) { return true } - return uint(now.Sub(state.RxLastSeen).Seconds()) >= state.onlineDeadline && uint(now.Sub(state.TxLastSeen).Seconds()) >= state.onlineDeadline + return uint(now.Sub(state.RxLastSeen).Seconds()) >= state.onlineDeadline && + uint(now.Sub(state.TxLastSeen).Seconds()) >= state.onlineDeadline } func (state *SPState) dirUnlock() { - state.ctx.UnlockDir(state.rxLock) - state.ctx.UnlockDir(state.txLock) + state.Ctx.UnlockDir(state.rxLock) + state.Ctx.UnlockDir(state.txLock) } func (state *SPState) WriteSP(dst io.Writer, payload []byte) error { @@ -226,7 +241,7 @@ func (state *SPState) ReadSP(src io.Reader) ([]byte, error) { return sp.Payload, nil } -func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]struct{}) [][]byte { +func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte { var infos []*SPInfo var totalSize int64 for job := range ctx.Jobs(nodeId, TTx) { @@ -243,7 +258,7 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]struct{} Size: uint64(job.Size), Hash: job.HshValue, }) - (*seen)[*job.HshValue] = struct{}{} + (*seen)[*job.HshValue] = job.PktEnc.Nice } sort.Sort(ByNice(infos)) var payloads [][]byte @@ -266,56 +281,52 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]struct{} return payloadsSplit(payloads) } -func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly TRxTx, onlineDeadline, maxOnlineTime uint) (*SPState, error) { - err := ctx.ensureRxDir(nodeId) +func (state *SPState) StartI(conn ConnDeadlined) error { + nodeId := state.Node.Id + err := state.Ctx.ensureRxDir(nodeId) if err != nil { - return nil, err + return err } var rxLock *os.File - if xxOnly == "" || xxOnly == TRx { - rxLock, err = ctx.LockDir(nodeId, TRx) + if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) { + rxLock, err = state.Ctx.LockDir(nodeId, TRx) if err != nil { - return nil, err + return err } } var txLock *os.File - if xxOnly == "" || xxOnly == TTx { - txLock, err = ctx.LockDir(nodeId, TTx) + if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) { + txLock, err = state.Ctx.LockDir(nodeId, TTx) if err != nil { - return nil, err + return err } } started := time.Now() - node := ctx.Neigh[*nodeId] conf := noise.Config{ CipherSuite: NoiseCipherSuite, Pattern: noise.HandshakeIK, Initiator: true, StaticKeypair: noise.DHKey{ - Private: ctx.Self.NoisePrv[:], - Public: ctx.Self.NoisePub[:], + Private: state.Ctx.Self.NoisePrv[:], + Public: state.Ctx.Self.NoisePub[:], }, - PeerStatic: node.NoisePub[:], - } - state := SPState{ - ctx: ctx, - hs: noise.NewHandshakeState(conf), - Node: node, - onlineDeadline: onlineDeadline, - maxOnlineTime: maxOnlineTime, - nice: nice, - payloads: make(chan []byte), - infosTheir: make(map[[32]byte]*SPInfo), - infosOurSeen: make(map[[32]byte]struct{}), - started: started, - rxLock: rxLock, - txLock: txLock, - xxOnly: xxOnly, + PeerStatic: state.Node.NoisePub[:], } + hs, err := noise.NewHandshakeState(conf) + if err != nil { + return err + } + state.hs = hs + state.payloads = make(chan []byte) + state.infosTheir = make(map[[32]byte]*SPInfo) + state.infosOurSeen = make(map[[32]byte]uint8) + state.started = started + state.rxLock = rxLock + state.txLock = txLock var infosPayloads [][]byte - if xxOnly == "" || xxOnly == TTx { - infosPayloads = ctx.infosOur(nodeId, nice, &state.infosOurSeen) + if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) { + infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen) } var firstPayload []byte if len(infosPayloads) > 0 { @@ -328,116 +339,122 @@ func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly TRxTx, var buf []byte var payload []byte - buf, _, _ = state.hs.WriteMessage(nil, firstPayload) - sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))} - ctx.LogD("sp-start", sds, "sending first message") + buf, _, _, err = state.hs.WriteMessage(nil, firstPayload) + if err != nil { + state.dirUnlock() + return err + } + sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(state.Nice))} + state.Ctx.LogD("sp-start", sds, "sending first message") conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second)) if err = state.WriteSP(conn, buf); err != nil { - ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") state.dirUnlock() - return nil, err + return err } - ctx.LogD("sp-start", sds, "waiting for first message") + state.Ctx.LogD("sp-start", sds, "waiting for first message") conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second)) if buf, err = state.ReadSP(conn); err != nil { - ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") state.dirUnlock() - return nil, err + return err } payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf) if err != nil { - ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") state.dirUnlock() - return nil, err + return err } - ctx.LogD("sp-start", sds, "starting workers") + state.Ctx.LogD("sp-start", sds, "starting workers") err = state.StartWorkers(conn, infosPayloads, payload) if err != nil { - ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") state.dirUnlock() - return nil, err + return err } - return &state, err + return err } -func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly TRxTx) (*SPState, error) { +func (state *SPState) StartR(conn ConnDeadlined) error { started := time.Now() conf := noise.Config{ CipherSuite: NoiseCipherSuite, Pattern: noise.HandshakeIK, Initiator: false, StaticKeypair: noise.DHKey{ - Private: ctx.Self.NoisePrv[:], - Public: ctx.Self.NoisePub[:], + Private: state.Ctx.Self.NoisePrv[:], + Public: state.Ctx.Self.NoisePub[:], }, } - state := SPState{ - ctx: ctx, - hs: noise.NewHandshakeState(conf), - nice: nice, - payloads: make(chan []byte), - infosOurSeen: make(map[[32]byte]struct{}), - infosTheir: make(map[[32]byte]*SPInfo), - started: started, - xxOnly: xxOnly, + hs, err := noise.NewHandshakeState(conf) + if err != nil { + return err } + xxOnly := TRxTx("") + state.hs = hs + state.payloads = make(chan []byte) + state.infosOurSeen = make(map[[32]byte]uint8) + state.infosTheir = make(map[[32]byte]*SPInfo) + state.started = started + state.xxOnly = xxOnly var buf []byte var payload []byte - var err error - ctx.LogD( + state.Ctx.LogD( "sp-start", - SDS{"nice": strconv.Itoa(int(nice))}, + SDS{"nice": strconv.Itoa(int(state.Nice))}, "waiting for first message", ) conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second)) if buf, err = state.ReadSP(conn); err != nil { - ctx.LogE("sp-start", SDS{"err": err}, "") - return nil, err + state.Ctx.LogE("sp-start", SDS{"err": err}, "") + return err } if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil { - ctx.LogE("sp-start", SDS{"err": err}, "") - return nil, err + state.Ctx.LogE("sp-start", SDS{"err": err}, "") + return err } var node *Node - for _, node = range ctx.Neigh { + for _, node = range state.Ctx.Neigh { if subtle.ConstantTimeCompare(state.hs.PeerStatic(), node.NoisePub[:]) == 1 { break } } if node == nil { peerId := ToBase32(state.hs.PeerStatic()) - ctx.LogE("sp-start", SDS{"peer": peerId}, "unknown") - return nil, errors.New("Unknown peer: " + peerId) + state.Ctx.LogE("sp-start", SDS{"peer": peerId}, "unknown") + return errors.New("Unknown peer: " + peerId) } state.Node = node + state.rxRate = node.RxRate + state.txRate = node.TxRate state.onlineDeadline = node.OnlineDeadline state.maxOnlineTime = node.MaxOnlineTime - sds := SDS{"node": node.Id, "nice": strconv.Itoa(int(nice))} + sds := SDS{"node": node.Id, "nice": strconv.Itoa(int(state.Nice))} - if ctx.ensureRxDir(node.Id); err != nil { - return nil, err + if state.Ctx.ensureRxDir(node.Id); err != nil { + return err } var rxLock *os.File if xxOnly == "" || xxOnly == TRx { - rxLock, err = ctx.LockDir(node.Id, TRx) + rxLock, err = state.Ctx.LockDir(node.Id, TRx) if err != nil { - return nil, err + return err } } state.rxLock = rxLock var txLock *os.File if xxOnly == "" || xxOnly == TTx { - txLock, err = ctx.LockDir(node.Id, TTx) + txLock, err = state.Ctx.LockDir(node.Id, TTx) if err != nil { - return nil, err + return err } } state.txLock = txLock var infosPayloads [][]byte if xxOnly == "" || xxOnly == TTx { - infosPayloads = ctx.infosOur(node.Id, nice, &state.infosOurSeen) + infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen) } var firstPayload []byte if len(infosPayloads) > 0 { @@ -448,29 +465,36 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly TRxTx) (*SPState, error firstPayload = append(firstPayload, SPHaltMarshalized...) } - ctx.LogD("sp-start", sds, "sending first message") - buf, state.csTheir, state.csOur = state.hs.WriteMessage(nil, firstPayload) + state.Ctx.LogD("sp-start", sds, "sending first message") + buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload) + if err != nil { + state.dirUnlock() + return err + } conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second)) if err = state.WriteSP(conn, buf); err != nil { - ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") state.dirUnlock() - return nil, err + return err } - ctx.LogD("sp-start", sds, "starting workers") + state.Ctx.LogD("sp-start", sds, "starting workers") err = state.StartWorkers(conn, infosPayloads, payload) if err != nil { state.dirUnlock() - return nil, err + return err } - return &state, err + return err } -func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payload []byte) error { - sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.nice))} +func (state *SPState) StartWorkers( + conn ConnDeadlined, + infosPayloads [][]byte, + payload []byte) error { + sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.Nice))} if len(infosPayloads) > 1 { go func() { for _, payload := range infosPayloads[1:] { - state.ctx.LogD( + state.Ctx.LogD( "sp-work", SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), "queuing remaining payload", @@ -479,20 +503,20 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa } }() } - state.ctx.LogD( + state.Ctx.LogD( "sp-work", SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), "processing first payload", ) replies, err := state.ProcessSP(payload) if err != nil { - state.ctx.LogE("sp-work", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-work", SdsAdd(sds, SDS{"err": err}), "") return err } go func() { for _, reply := range replies { - state.ctx.LogD( + state.Ctx.LogD( "sp-work", SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}), "queuing reply", @@ -501,15 +525,18 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa } }() - if state.xxOnly == "" || state.xxOnly == TTx { + if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) { go func() { for range time.Tick(time.Second) { - for _, payload := range state.ctx.infosOur( + if state.NotAlive() { + return + } + for _, payload := range state.Ctx.infosOur( state.Node.Id, - state.nice, + state.Nice, &state.infosOurSeen, ) { - state.ctx.LogD( + state.Ctx.LogD( "sp-work", SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), "queuing new info", @@ -533,7 +560,7 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa var payload []byte select { case payload = <-state.payloads: - state.ctx.LogD( + state.Ctx.LogD( "sp-xmit", SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), "got payload", @@ -543,50 +570,55 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa if payload == nil { state.RLock() if len(state.queueTheir) == 0 { - state.ctx.LogD("sp-xmit", sds, "file queue is empty") + state.Ctx.LogD("sp-xmit", sds, "file queue is empty") state.RUnlock() time.Sleep(100 * time.Millisecond) continue } - freq := state.queueTheir[0] + freq := state.queueTheir[0].freq state.RUnlock() + + if state.txRate > 0 { + time.Sleep(time.Second / time.Duration(state.txRate)) + } + sdsp := SdsAdd(sds, SDS{ "xx": string(TTx), "hash": ToBase32(freq.Hash[:]), "size": strconv.FormatInt(int64(freq.Offset), 10), }) - state.ctx.LogD("sp-file", sdsp, "queueing") + state.Ctx.LogD("sp-file", sdsp, "queueing") fd, err := os.Open(filepath.Join( - state.ctx.Spool, + state.Ctx.Spool, state.Node.Id.String(), string(TTx), ToBase32(freq.Hash[:]), )) if err != nil { - state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") + state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") break } fi, err := fd.Stat() if err != nil { - state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") + state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") break } fullSize := uint64(fi.Size()) var buf []byte if freq.Offset < fullSize { - state.ctx.LogD("sp-file", sdsp, "seeking") - if _, err = fd.Seek(int64(freq.Offset), 0); err != nil { - state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") + state.Ctx.LogD("sp-file", sdsp, "seeking") + if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil { + state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") break } buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead) n, err := fd.Read(buf) if err != nil { - state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") + state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") break } buf = buf[:n] - state.ctx.LogD( + state.Ctx.LogD( "sp-file", SdsAdd(sdsp, SDS{"size": strconv.Itoa(n)}), "read", @@ -601,32 +633,32 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa ourSize := freq.Offset + uint64(len(buf)) sdsp["size"] = strconv.FormatInt(int64(ourSize), 10) sdsp["fullsize"] = strconv.FormatInt(int64(fullSize), 10) - state.ctx.LogP("sp-file", sdsp, "") + state.Ctx.LogP("sp-file", sdsp, "") state.Lock() - if len(state.queueTheir) > 0 && *state.queueTheir[0].Hash == *freq.Hash { + if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash { if ourSize == fullSize { - state.ctx.LogD("sp-file", sdsp, "finished") + state.Ctx.LogD("sp-file", sdsp, "finished") if len(state.queueTheir) > 1 { state.queueTheir = state.queueTheir[1:] } else { state.queueTheir = state.queueTheir[:0] } } else { - state.queueTheir[0].Offset += uint64(len(buf)) + state.queueTheir[0].freq.Offset += uint64(len(buf)) } } else { - state.ctx.LogD("sp-file", sdsp, "queue disappeared") + state.Ctx.LogD("sp-file", sdsp, "queue disappeared") } state.Unlock() } - state.ctx.LogD( + state.Ctx.LogD( "sp-xmit", SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), "sending", ) - conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second)) if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil { - state.ctx.LogE("sp-xmit", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-xmit", SdsAdd(sds, SDS{"err": err}), "") break } } @@ -642,7 +674,7 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa if state.NotAlive() { return } - state.ctx.LogD("sp-recv", sds, "waiting for payload") + state.Ctx.LogD("sp-recv", sds, "waiting for payload") conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second)) payload, err := state.ReadSP(conn) if err != nil { @@ -654,32 +686,32 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa if unmarshalErr.ErrorCode == xdr.ErrIO { break } - state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "") break } - state.ctx.LogD( + state.Ctx.LogD( "sp-recv", SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), "got payload", ) payload, err = state.csTheir.Decrypt(nil, nil, payload) if err != nil { - state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "") break } - state.ctx.LogD( + state.Ctx.LogD( "sp-recv", SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), "processing", ) replies, err := state.ProcessSP(payload) if err != nil { - state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "") break } go func() { for _, reply := range replies { - state.ctx.LogD( + state.Ctx.LogD( "sp-recv", SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}), "queuing reply", @@ -687,6 +719,9 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa state.payloads <- reply } }() + if state.rxRate > 0 { + time.Sleep(time.Second / time.Duration(state.rxRate)) + } } }() @@ -710,122 +745,129 @@ func (state *SPState) Wait() { } func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { - sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.nice))} + sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.Nice))} r := bytes.NewReader(payload) var err error var replies [][]byte var infosGot bool for r.Len() > 0 { - state.ctx.LogD("sp-process", sds, "unmarshaling header") + state.Ctx.LogD("sp-process", sds, "unmarshaling header") var head SPHead if _, err = xdr.Unmarshal(r, &head); err != nil { - state.ctx.LogE("sp-process", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"err": err}), "") return nil, err } switch head.Type { case SPTypeInfo: infosGot = true sdsp := SdsAdd(sds, SDS{"type": "info"}) - state.ctx.LogD("sp-process", sdsp, "unmarshaling packet") + state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet") var info SPInfo if _, err = xdr.Unmarshal(r, &info); err != nil { - state.ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "") + state.Ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "") return nil, err } sdsp = SdsAdd(sds, SDS{ "hash": ToBase32(info.Hash[:]), "size": strconv.FormatInt(int64(info.Size), 10), + "nice": strconv.Itoa(int(info.Nice)), }) - if info.Nice > state.nice { - state.ctx.LogD("sp-process", sdsp, "too nice") + if !state.listOnly && info.Nice > state.Nice { + state.Ctx.LogD("sp-process", sdsp, "too nice") continue } - state.ctx.LogD("sp-process", sdsp, "received") - if state.xxOnly == TTx { + state.Ctx.LogD("sp-process", sdsp, "received") + if !state.listOnly && state.xxOnly == TTx { continue } state.Lock() state.infosTheir[*info.Hash] = &info state.Unlock() - state.ctx.LogD("sp-process", sdsp, "stating part") - if _, err = os.Stat(filepath.Join( - state.ctx.Spool, + state.Ctx.LogD("sp-process", sdsp, "stating part") + pktPath := filepath.Join( + state.Ctx.Spool, state.Node.Id.String(), string(TRx), ToBase32(info.Hash[:]), - )); err == nil { - state.ctx.LogD("sp-process", sdsp, "already done") - replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash})) + ) + if _, err = os.Stat(pktPath); err == nil { + state.Ctx.LogI("sp-info", sdsp, "already done") + if !state.listOnly { + replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash})) + } continue } - fi, err := os.Stat(filepath.Join( - state.ctx.Spool, - state.Node.Id.String(), - string(TRx), - ToBase32(info.Hash[:])+PartSuffix, - )) + if _, err = os.Stat(pktPath + SeenSuffix); err == nil { + state.Ctx.LogI("sp-info", sdsp, "already seen") + if !state.listOnly { + replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash})) + } + continue + } + fi, err := os.Stat(pktPath + PartSuffix) var offset int64 if err == nil { offset = fi.Size() - state.ctx.LogD( - "sp-process", - SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(offset, 10)}), - "part exists", - ) } - replies = append(replies, MarshalSP( - SPTypeFreq, - SPFreq{info.Hash, uint64(offset)}, - )) - case SPTypeFile: - state.ctx.LogD( - "sp-process", - SdsAdd(sds, SDS{"type": "file"}), - "unmarshaling packet", + if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) { + state.Ctx.LogI("sp-info", sdsp, "not enough space") + continue + } + state.Ctx.LogI( + "sp-info", + SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(offset, 10)}), + "", ) + if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) { + replies = append(replies, MarshalSP( + SPTypeFreq, + SPFreq{info.Hash, uint64(offset)}, + )) + } + case SPTypeFile: + sdsp := SdsAdd(sds, SDS{"type": "file"}) + state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet") var file SPFile if _, err = xdr.Unmarshal(r, &file); err != nil { - state.ctx.LogE("sp-process", SdsAdd(sds, SDS{ + state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{ "err": err, "type": "file", }), "") return nil, err } - sdsp := SdsAdd(sds, SDS{ - "xx": string(TRx), - "hash": ToBase32(file.Hash[:]), - "size": strconv.Itoa(len(file.Payload)), - }) + sdsp["xx"] = string(TRx) + sdsp["hash"] = ToBase32(file.Hash[:]) + sdsp["size"] = strconv.Itoa(len(file.Payload)) filePath := filepath.Join( - state.ctx.Spool, + state.Ctx.Spool, state.Node.Id.String(), string(TRx), ToBase32(file.Hash[:]), ) - state.ctx.LogD("sp-file", sdsp, "opening part") + state.Ctx.LogD("sp-file", sdsp, "opening part") fd, err := os.OpenFile( filePath+PartSuffix, os.O_RDWR|os.O_CREATE, os.FileMode(0600), ) if err != nil { - state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") + state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") return nil, err } - state.ctx.LogD( + state.Ctx.LogD( "sp-file", SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(int64(file.Offset), 10)}), "seeking", ) - if _, err = fd.Seek(int64(file.Offset), 0); err != nil { - state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") + if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil { + state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") fd.Close() return nil, err } - state.ctx.LogD("sp-file", sdsp, "writing") + state.Ctx.LogD("sp-file", sdsp, "writing") _, err = fd.Write(file.Payload) if err != nil { - state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") + state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") fd.Close() return nil, err } @@ -833,7 +875,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.RLock() sdsp["fullsize"] = strconv.FormatInt(int64(state.infosTheir[*file.Hash].Size), 10) sdsp["size"] = strconv.FormatInt(int64(ourSize), 10) - state.ctx.LogP("sp-file", sdsp, "") + state.Ctx.LogP("sp-file", sdsp, "") if state.infosTheir[*file.Hash].Size != ourSize { state.RUnlock() fd.Close() @@ -844,21 +886,21 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { spWorkersGroup.Add(1) go func() { if err := fd.Sync(); err != nil { - state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "sync") + state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "sync") fd.Close() return } state.wg.Add(1) defer state.wg.Done() - fd.Seek(0, 0) - state.ctx.LogD("sp-file", sdsp, "checking") + fd.Seek(0, io.SeekStart) + state.Ctx.LogD("sp-file", sdsp, "checking") gut, err := Check(fd, file.Hash[:]) fd.Close() if err != nil || !gut { - state.ctx.LogE("sp-file", sdsp, "checksum mismatch") + state.Ctx.LogE("sp-file", sdsp, "checksum mismatch") return } - state.ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "") + state.Ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "") os.Rename(filePath+PartSuffix, filePath) state.Lock() delete(state.infosTheir, *file.Hash) @@ -869,55 +911,69 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { }() }() case SPTypeDone: - state.ctx.LogD( - "sp-process", - SdsAdd(sds, SDS{"type": "done"}), - "unmarshaling packet", - ) + sdsp := SdsAdd(sds, SDS{"type": "done"}) + state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet") var done SPDone if _, err = xdr.Unmarshal(r, &done); err != nil { - state.ctx.LogE("sp-process", SdsAdd(sds, SDS{ + state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{ "type": "done", "err": err, }), "") return nil, err } - sdsp := SdsAdd(sds, SDS{"hash": ToBase32(done.Hash[:])}) - state.ctx.LogD("sp-done", sdsp, "removing") + sdsp["hash"] = ToBase32(done.Hash[:]) + state.Ctx.LogD("sp-done", sdsp, "removing") err := os.Remove(filepath.Join( - state.ctx.Spool, + state.Ctx.Spool, state.Node.Id.String(), string(TTx), ToBase32(done.Hash[:]), )) + sdsp["xx"] = string(TTx) if err == nil { - state.ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TTx)}), "") + state.Ctx.LogI("sp-done", sdsp, "") } else { - state.ctx.LogE("sp-done", SdsAdd(sdsp, SDS{"xx": string(TTx)}), "") + state.Ctx.LogE("sp-done", sdsp, "") } case SPTypeFreq: sdsp := SdsAdd(sds, SDS{"type": "freq"}) - state.ctx.LogD("sp-process", sdsp, "unmarshaling packet") + state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet") var freq SPFreq if _, err = xdr.Unmarshal(r, &freq); err != nil { - state.ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "") + state.Ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "") return nil, err } - state.ctx.LogD("sp-process", SdsAdd(sdsp, SDS{ - "hash": ToBase32(freq.Hash[:]), - "offset": strconv.FormatInt(int64(freq.Offset), 10), - }), "queueing") - state.Lock() - state.queueTheir = append(state.queueTheir, &freq) - state.Unlock() + sdsp["hash"] = ToBase32(freq.Hash[:]) + sdsp["offset"] = strconv.FormatInt(int64(freq.Offset), 10) + state.Ctx.LogD("sp-process", sdsp, "queueing") + nice, exists := state.infosOurSeen[*freq.Hash] + if exists { + if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] { + state.Lock() + insertIdx := 0 + var freqWithNice *FreqWithNice + for insertIdx, freqWithNice = range state.queueTheir { + if freqWithNice.nice > nice { + break + } + } + state.queueTheir = append(state.queueTheir, nil) + copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:]) + state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice} + state.Unlock() + } else { + state.Ctx.LogD("sp-process", sdsp, "skipping") + } + } else { + state.Ctx.LogD("sp-process", sdsp, "unknown") + } case SPTypeHalt: - sdsp := SdsAdd(sds, SDS{"type": "halt"}) - state.ctx.LogD("sp-process", sdsp, "") + state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "halt"}), "") state.Lock() state.queueTheir = nil state.Unlock() default: - state.ctx.LogE( + state.Ctx.LogE( "sp-process", SdsAdd(sds, SDS{"type": head.Type}), "unknown", @@ -934,7 +990,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { size += info.Size } state.RUnlock() - state.ctx.LogI("sp-infos", SDS{ + state.Ctx.LogI("sp-infos", SDS{ "xx": string(TRx), "node": state.Node.Id, "pkts": strconv.Itoa(pkts),