/*
NNCP -- Node to Node copy, utilities for store-and-forward data exchange
-Copyright (C) 2016-2017 Sergey Matveev <stargrave@stargrave.org>
+Copyright (C) 2016-2019 Sergey Matveev <stargrave@stargrave.org>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
-the Free Software Foundation, either version 3 of the License, or
-(at your option) any later version.
+the Free Software Foundation, version 3 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
)
const (
- MaxSPSize = 2<<16 - 256
+ MaxSPSize = 1<<16 - 256
PartSuffix = ".part"
DefaultDeadline = 10
)
noise.CipherChaChaPoly,
noise.HashBLAKE2b,
)
+
+ spWorkersGroup sync.WaitGroup
)
type SPType uint8
Payload []byte
}
+type FreqWithNice struct {
+ freq *SPFreq
+ nice uint8
+}
+
+type ConnDeadlined interface {
+ io.ReadWriter
+ SetReadDeadline(t time.Time) error
+ SetWriteDeadline(t time.Time) error
+}
+
func init() {
var buf bytes.Buffer
spHead := SPHead{Type: SPTypeHalt}
}
type SPState struct {
- ctx *Ctx
- Node *Node
- nice uint8
- hs *noise.HandshakeState
- csOur *noise.CipherState
- csTheir *noise.CipherState
- payloads chan []byte
- infosTheir map[[32]byte]*SPInfo
- infosOurSeen map[[32]byte]struct{}
- queueTheir []*SPFreq
- wg sync.WaitGroup
- RxBytes int64
- RxLastSeen time.Time
- TxBytes int64
- TxLastSeen time.Time
- started time.Time
- Duration time.Duration
- RxSpeed int64
- TxSpeed int64
- rxLock *os.File
- txLock *os.File
- xxOnly *TRxTx
- isDead bool
+ Ctx *Ctx
+ Node *Node
+ Nice uint8
+ onlineDeadline uint
+ maxOnlineTime uint
+ hs *noise.HandshakeState
+ csOur *noise.CipherState
+ csTheir *noise.CipherState
+ payloads chan []byte
+ infosTheir map[[32]byte]*SPInfo
+ infosOurSeen map[[32]byte]uint8
+ queueTheir []*FreqWithNice
+ wg sync.WaitGroup
+ RxBytes int64
+ RxLastSeen time.Time
+ TxBytes int64
+ TxLastSeen time.Time
+ started time.Time
+ Duration time.Duration
+ RxSpeed int64
+ TxSpeed int64
+ rxLock *os.File
+ txLock *os.File
+ xxOnly TRxTx
+ rxRate int
+ txRate int
+ isDead bool
+ listOnly bool
+ onlyPkts map[[32]byte]bool
sync.RWMutex
}
func (state *SPState) NotAlive() bool {
+ if state.isDead {
+ return true
+ }
now := time.Now()
- return state.isDead || (int(now.Sub(state.RxLastSeen).Seconds()) >= state.Node.OnlineDeadline && int(now.Sub(state.TxLastSeen).Seconds()) >= state.Node.OnlineDeadline)
+ if state.maxOnlineTime > 0 && state.started.Add(time.Duration(state.maxOnlineTime)*time.Second).Before(now) {
+ return true
+ }
+ return uint(now.Sub(state.RxLastSeen).Seconds()) >= state.onlineDeadline &&
+ uint(now.Sub(state.TxLastSeen).Seconds()) >= state.onlineDeadline
}
func (state *SPState) dirUnlock() {
- state.ctx.UnlockDir(state.rxLock)
- state.ctx.UnlockDir(state.txLock)
+ state.Ctx.UnlockDir(state.rxLock)
+ state.Ctx.UnlockDir(state.txLock)
}
func (state *SPState) WriteSP(dst io.Writer, payload []byte) error {
func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
var sp SPRaw
- n, err := xdr.UnmarshalLimited(src, &sp, 2<<17)
+ n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
if err != nil {
return nil, err
}
return sp.Payload, nil
}
-func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]struct{}) [][]byte {
+func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
var infos []*SPInfo
var totalSize int64
for job := range ctx.Jobs(nodeId, TTx) {
Size: uint64(job.Size),
Hash: job.HshValue,
})
- (*seen)[*job.HshValue] = struct{}{}
+ (*seen)[*job.HshValue] = job.PktEnc.Nice
}
sort.Sort(ByNice(infos))
var payloads [][]byte
return payloadsSplit(payloads)
}
-func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx) (*SPState, error) {
- err := ctx.ensureRxDir(nodeId)
+func (state *SPState) StartI(conn ConnDeadlined) error {
+ nodeId := state.Node.Id
+ err := state.Ctx.ensureRxDir(nodeId)
if err != nil {
- return nil, err
+ return err
}
var rxLock *os.File
- if xxOnly != nil && *xxOnly == TRx {
- rxLock, err = ctx.LockDir(nodeId, TRx)
+ if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
+ rxLock, err = state.Ctx.LockDir(nodeId, TRx)
if err != nil {
- return nil, err
+ return err
}
}
var txLock *os.File
- if xxOnly != nil && *xxOnly == TTx {
- txLock, err = ctx.LockDir(nodeId, TTx)
+ if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
+ txLock, err = state.Ctx.LockDir(nodeId, TTx)
if err != nil {
- return nil, err
+ return err
}
}
started := time.Now()
- node := ctx.Neigh[*nodeId]
conf := noise.Config{
CipherSuite: NoiseCipherSuite,
Pattern: noise.HandshakeIK,
Initiator: true,
StaticKeypair: noise.DHKey{
- Private: ctx.Self.NoisePrv[:],
- Public: ctx.Self.NoisePub[:],
+ Private: state.Ctx.Self.NoisePrv[:],
+ Public: state.Ctx.Self.NoisePub[:],
},
- PeerStatic: node.NoisePub[:],
- }
- state := SPState{
- ctx: ctx,
- hs: noise.NewHandshakeState(conf),
- Node: node,
- nice: nice,
- payloads: make(chan []byte),
- infosTheir: make(map[[32]byte]*SPInfo),
- infosOurSeen: make(map[[32]byte]struct{}),
- started: started,
- rxLock: rxLock,
- txLock: txLock,
- xxOnly: xxOnly,
+ PeerStatic: state.Node.NoisePub[:],
}
+ hs, err := noise.NewHandshakeState(conf)
+ if err != nil {
+ return err
+ }
+ state.hs = hs
+ state.payloads = make(chan []byte)
+ state.infosTheir = make(map[[32]byte]*SPInfo)
+ state.infosOurSeen = make(map[[32]byte]uint8)
+ state.started = started
+ state.rxLock = rxLock
+ state.txLock = txLock
var infosPayloads [][]byte
- if xxOnly == nil || *xxOnly != TTx {
- infosPayloads = ctx.infosOur(nodeId, nice, &state.infosOurSeen)
+ if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
+ infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
}
var firstPayload []byte
if len(infosPayloads) > 0 {
var buf []byte
var payload []byte
- buf, _, _ = state.hs.WriteMessage(nil, firstPayload)
- sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))}
- ctx.LogD("sp-start", sds, "sending first message")
+ buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
+ if err != nil {
+ state.dirUnlock()
+ return err
+ }
+ sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(state.Nice))}
+ state.Ctx.LogD("sp-start", sds, "sending first message")
conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
if err = state.WriteSP(conn, buf); err != nil {
- ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
+ state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
state.dirUnlock()
- return nil, err
+ return err
}
- ctx.LogD("sp-start", sds, "waiting for first message")
+ state.Ctx.LogD("sp-start", sds, "waiting for first message")
conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
if buf, err = state.ReadSP(conn); err != nil {
- ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
+ state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
state.dirUnlock()
- return nil, err
+ return err
}
payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
if err != nil {
- ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
+ state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
state.dirUnlock()
- return nil, err
+ return err
}
- ctx.LogD("sp-start", sds, "starting workers")
+ state.Ctx.LogD("sp-start", sds, "starting workers")
err = state.StartWorkers(conn, infosPayloads, payload)
if err != nil {
- ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
+ state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
state.dirUnlock()
- return nil, err
+ return err
}
- return &state, err
+ return err
}
-func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, error) {
+func (state *SPState) StartR(conn ConnDeadlined) error {
started := time.Now()
conf := noise.Config{
CipherSuite: NoiseCipherSuite,
Pattern: noise.HandshakeIK,
Initiator: false,
StaticKeypair: noise.DHKey{
- Private: ctx.Self.NoisePrv[:],
- Public: ctx.Self.NoisePub[:],
+ Private: state.Ctx.Self.NoisePrv[:],
+ Public: state.Ctx.Self.NoisePub[:],
},
}
- state := SPState{
- ctx: ctx,
- hs: noise.NewHandshakeState(conf),
- nice: nice,
- payloads: make(chan []byte),
- infosOurSeen: make(map[[32]byte]struct{}),
- infosTheir: make(map[[32]byte]*SPInfo),
- started: started,
- xxOnly: xxOnly,
+ hs, err := noise.NewHandshakeState(conf)
+ if err != nil {
+ return err
}
+ xxOnly := TRxTx("")
+ state.hs = hs
+ state.payloads = make(chan []byte)
+ state.infosOurSeen = make(map[[32]byte]uint8)
+ state.infosTheir = make(map[[32]byte]*SPInfo)
+ state.started = started
+ state.xxOnly = xxOnly
var buf []byte
var payload []byte
- var err error
- ctx.LogD(
+ state.Ctx.LogD(
"sp-start",
- SDS{"nice": strconv.Itoa(int(nice))},
+ SDS{"nice": strconv.Itoa(int(state.Nice))},
"waiting for first message",
)
conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
if buf, err = state.ReadSP(conn); err != nil {
- ctx.LogE("sp-start", SDS{"err": err}, "")
- return nil, err
+ state.Ctx.LogE("sp-start", SDS{"err": err}, "")
+ return err
}
if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
- ctx.LogE("sp-start", SDS{"err": err}, "")
- return nil, err
+ state.Ctx.LogE("sp-start", SDS{"err": err}, "")
+ return err
}
var node *Node
- for _, node = range ctx.Neigh {
+ for _, node = range state.Ctx.Neigh {
if subtle.ConstantTimeCompare(state.hs.PeerStatic(), node.NoisePub[:]) == 1 {
break
}
}
if node == nil {
peerId := ToBase32(state.hs.PeerStatic())
- ctx.LogE("sp-start", SDS{"peer": peerId}, "unknown")
- return nil, errors.New("Unknown peer: " + peerId)
+ state.Ctx.LogE("sp-start", SDS{"peer": peerId}, "unknown")
+ return errors.New("Unknown peer: " + peerId)
}
state.Node = node
- sds := SDS{"node": node.Id, "nice": strconv.Itoa(int(nice))}
+ state.rxRate = node.RxRate
+ state.txRate = node.TxRate
+ state.onlineDeadline = node.OnlineDeadline
+ state.maxOnlineTime = node.MaxOnlineTime
+ sds := SDS{"node": node.Id, "nice": strconv.Itoa(int(state.Nice))}
- if ctx.ensureRxDir(node.Id); err != nil {
- return nil, err
+ if state.Ctx.ensureRxDir(node.Id); err != nil {
+ return err
}
var rxLock *os.File
- if xxOnly != nil && *xxOnly == TRx {
- rxLock, err = ctx.LockDir(node.Id, TRx)
+ if xxOnly == "" || xxOnly == TRx {
+ rxLock, err = state.Ctx.LockDir(node.Id, TRx)
if err != nil {
- return nil, err
+ return err
}
}
state.rxLock = rxLock
var txLock *os.File
- if xxOnly != nil && *xxOnly == TTx {
- txLock, err = ctx.LockDir(node.Id, TTx)
+ if xxOnly == "" || xxOnly == TTx {
+ txLock, err = state.Ctx.LockDir(node.Id, TTx)
if err != nil {
- return nil, err
+ return err
}
}
state.txLock = txLock
var infosPayloads [][]byte
- if xxOnly == nil || *xxOnly != TTx {
- infosPayloads = ctx.infosOur(node.Id, nice, &state.infosOurSeen)
+ if xxOnly == "" || xxOnly == TTx {
+ infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
}
var firstPayload []byte
if len(infosPayloads) > 0 {
firstPayload = append(firstPayload, SPHaltMarshalized...)
}
- ctx.LogD("sp-start", sds, "sending first message")
- buf, state.csTheir, state.csOur = state.hs.WriteMessage(nil, firstPayload)
+ state.Ctx.LogD("sp-start", sds, "sending first message")
+ buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
+ if err != nil {
+ state.dirUnlock()
+ return err
+ }
conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
if err = state.WriteSP(conn, buf); err != nil {
- ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
+ state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
state.dirUnlock()
- return nil, err
+ return err
}
- ctx.LogD("sp-start", sds, "starting workers")
+ state.Ctx.LogD("sp-start", sds, "starting workers")
err = state.StartWorkers(conn, infosPayloads, payload)
if err != nil {
state.dirUnlock()
- return nil, err
+ return err
}
- return &state, err
+ return err
}
-func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payload []byte) error {
- sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.nice))}
+func (state *SPState) StartWorkers(
+ conn ConnDeadlined,
+ infosPayloads [][]byte,
+ payload []byte) error {
+ sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.Nice))}
if len(infosPayloads) > 1 {
go func() {
for _, payload := range infosPayloads[1:] {
- state.ctx.LogD(
+ state.Ctx.LogD(
"sp-work",
SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
"queuing remaining payload",
}
}()
}
- state.ctx.LogD(
+ state.Ctx.LogD(
"sp-work",
SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
"processing first payload",
)
replies, err := state.ProcessSP(payload)
if err != nil {
- state.ctx.LogE("sp-work", SdsAdd(sds, SDS{"err": err}), "")
+ state.Ctx.LogE("sp-work", SdsAdd(sds, SDS{"err": err}), "")
return err
}
go func() {
for _, reply := range replies {
- state.ctx.LogD(
+ state.Ctx.LogD(
"sp-work",
SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}),
"queuing reply",
}
}()
- go func() {
- for range time.Tick(time.Second) {
- for _, payload := range state.ctx.infosOur(
- state.Node.Id,
- state.nice,
- &state.infosOurSeen,
- ) {
- state.ctx.LogD(
- "sp-work",
- SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
- "queuing new info",
- )
- state.payloads <- payload
+ if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
+ go func() {
+ for range time.Tick(time.Second) {
+ if state.NotAlive() {
+ return
+ }
+ for _, payload := range state.Ctx.infosOur(
+ state.Node.Id,
+ state.Nice,
+ &state.infosOurSeen,
+ ) {
+ state.Ctx.LogD(
+ "sp-work",
+ SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
+ "queuing new info",
+ )
+ state.payloads <- payload
+ }
}
- }
- }()
+ }()
+ }
state.wg.Add(1)
go func() {
var payload []byte
select {
case payload = <-state.payloads:
- state.ctx.LogD(
+ state.Ctx.LogD(
"sp-xmit",
SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
"got payload",
if payload == nil {
state.RLock()
if len(state.queueTheir) == 0 {
- state.ctx.LogD("sp-xmit", sds, "file queue is empty")
+ state.Ctx.LogD("sp-xmit", sds, "file queue is empty")
state.RUnlock()
time.Sleep(100 * time.Millisecond)
continue
}
- freq := state.queueTheir[0]
+ freq := state.queueTheir[0].freq
state.RUnlock()
+
+ if state.txRate > 0 {
+ time.Sleep(time.Second / time.Duration(state.txRate))
+ }
+
sdsp := SdsAdd(sds, SDS{
"xx": string(TTx),
"hash": ToBase32(freq.Hash[:]),
"size": strconv.FormatInt(int64(freq.Offset), 10),
})
- state.ctx.LogD("sp-file", sdsp, "queueing")
+ state.Ctx.LogD("sp-file", sdsp, "queueing")
fd, err := os.Open(filepath.Join(
- state.ctx.Spool,
+ state.Ctx.Spool,
state.Node.Id.String(),
string(TTx),
ToBase32(freq.Hash[:]),
))
if err != nil {
- state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
+ state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
break
}
fi, err := fd.Stat()
if err != nil {
- state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
+ state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
break
}
fullSize := uint64(fi.Size())
var buf []byte
if freq.Offset < fullSize {
- state.ctx.LogD("sp-file", sdsp, "seeking")
+ state.Ctx.LogD("sp-file", sdsp, "seeking")
if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
- state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
+ state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
break
}
buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
n, err := fd.Read(buf)
if err != nil {
- state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
+ state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
break
}
buf = buf[:n]
- state.ctx.LogD(
+ state.Ctx.LogD(
"sp-file",
SdsAdd(sdsp, SDS{"size": strconv.Itoa(n)}),
"read",
ourSize := freq.Offset + uint64(len(buf))
sdsp["size"] = strconv.FormatInt(int64(ourSize), 10)
sdsp["fullsize"] = strconv.FormatInt(int64(fullSize), 10)
- state.ctx.LogP("sp-file", sdsp, "")
+ state.Ctx.LogP("sp-file", sdsp, "")
state.Lock()
- if len(state.queueTheir) > 0 && *state.queueTheir[0].Hash == *freq.Hash {
+ if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
if ourSize == fullSize {
- state.ctx.LogD("sp-file", sdsp, "finished")
+ state.Ctx.LogD("sp-file", sdsp, "finished")
if len(state.queueTheir) > 1 {
state.queueTheir = state.queueTheir[1:]
} else {
state.queueTheir = state.queueTheir[:0]
}
} else {
- state.queueTheir[0].Offset += uint64(len(buf))
+ state.queueTheir[0].freq.Offset += uint64(len(buf))
}
} else {
- state.ctx.LogD("sp-file", sdsp, "queue disappeared")
+ state.Ctx.LogD("sp-file", sdsp, "queue disappeared")
}
state.Unlock()
}
- state.ctx.LogD(
+ state.Ctx.LogD(
"sp-xmit",
SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
"sending",
)
- conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
+ conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil {
- state.ctx.LogE("sp-xmit", SdsAdd(sds, SDS{"err": err}), "")
+ state.Ctx.LogE("sp-xmit", SdsAdd(sds, SDS{"err": err}), "")
break
}
}
if state.NotAlive() {
return
}
- state.ctx.LogD("sp-recv", sds, "waiting for payload")
+ state.Ctx.LogD("sp-recv", sds, "waiting for payload")
conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
payload, err := state.ReadSP(conn)
if err != nil {
if unmarshalErr.ErrorCode == xdr.ErrIO {
break
}
- state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
+ state.Ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
break
}
- state.ctx.LogD(
+ state.Ctx.LogD(
"sp-recv",
SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
"got payload",
)
payload, err = state.csTheir.Decrypt(nil, nil, payload)
if err != nil {
- state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
+ state.Ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
break
}
- state.ctx.LogD(
+ state.Ctx.LogD(
"sp-recv",
SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
"processing",
)
replies, err := state.ProcessSP(payload)
if err != nil {
- state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
+ state.Ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
break
}
go func() {
for _, reply := range replies {
- state.ctx.LogD(
+ state.Ctx.LogD(
"sp-recv",
SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}),
"queuing reply",
state.payloads <- reply
}
}()
+ if state.rxRate > 0 {
+ time.Sleep(time.Second / time.Duration(state.rxRate))
+ }
}
}()
}
func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
- sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.nice))}
+ sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.Nice))}
r := bytes.NewReader(payload)
var err error
var replies [][]byte
var infosGot bool
for r.Len() > 0 {
- state.ctx.LogD("sp-process", sds, "unmarshaling header")
+ state.Ctx.LogD("sp-process", sds, "unmarshaling header")
var head SPHead
if _, err = xdr.Unmarshal(r, &head); err != nil {
- state.ctx.LogE("sp-process", SdsAdd(sds, SDS{"err": err}), "")
+ state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"err": err}), "")
return nil, err
}
switch head.Type {
case SPTypeInfo:
infosGot = true
sdsp := SdsAdd(sds, SDS{"type": "info"})
- state.ctx.LogD("sp-process", sdsp, "unmarshaling packet")
+ state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
var info SPInfo
if _, err = xdr.Unmarshal(r, &info); err != nil {
- state.ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "")
+ state.Ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "")
return nil, err
}
sdsp = SdsAdd(sds, SDS{
"hash": ToBase32(info.Hash[:]),
"size": strconv.FormatInt(int64(info.Size), 10),
+ "nice": strconv.Itoa(int(info.Nice)),
})
- if info.Nice > state.nice {
- state.ctx.LogD("sp-process", sdsp, "too nice")
+ if !state.listOnly && info.Nice > state.Nice {
+ state.Ctx.LogD("sp-process", sdsp, "too nice")
continue
}
- state.ctx.LogD("sp-process", sdsp, "received")
- if state.xxOnly != nil && *state.xxOnly == TTx {
+ state.Ctx.LogD("sp-process", sdsp, "received")
+ if !state.listOnly && state.xxOnly == TTx {
continue
}
state.Lock()
state.infosTheir[*info.Hash] = &info
state.Unlock()
- state.ctx.LogD("sp-process", sdsp, "stating part")
- if _, err = os.Stat(filepath.Join(
- state.ctx.Spool,
+ state.Ctx.LogD("sp-process", sdsp, "stating part")
+ pktPath := filepath.Join(
+ state.Ctx.Spool,
state.Node.Id.String(),
string(TRx),
ToBase32(info.Hash[:]),
- )); err == nil {
- state.ctx.LogD("sp-process", sdsp, "already done")
- replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
+ )
+ if _, err = os.Stat(pktPath); err == nil {
+ state.Ctx.LogI("sp-info", sdsp, "already done")
+ if !state.listOnly {
+ replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
+ }
continue
}
- fi, err := os.Stat(filepath.Join(
- state.ctx.Spool,
- state.Node.Id.String(),
- string(TRx),
- ToBase32(info.Hash[:])+PartSuffix,
- ))
+ if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
+ state.Ctx.LogI("sp-info", sdsp, "already seen")
+ if !state.listOnly {
+ replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
+ }
+ continue
+ }
+ fi, err := os.Stat(pktPath + PartSuffix)
var offset int64
if err == nil {
offset = fi.Size()
- state.ctx.LogD(
- "sp-process",
- SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(offset, 10)}),
- "part exists",
- )
}
- replies = append(replies, MarshalSP(
- SPTypeFreq,
- SPFreq{info.Hash, uint64(offset)},
- ))
- case SPTypeFile:
- state.ctx.LogD(
- "sp-process",
- SdsAdd(sds, SDS{"type": "file"}),
- "unmarshaling packet",
+ if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
+ state.Ctx.LogI("sp-info", sdsp, "not enough space")
+ continue
+ }
+ state.Ctx.LogI(
+ "sp-info",
+ SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(offset, 10)}),
+ "",
)
+ if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
+ replies = append(replies, MarshalSP(
+ SPTypeFreq,
+ SPFreq{info.Hash, uint64(offset)},
+ ))
+ }
+ case SPTypeFile:
+ sdsp := SdsAdd(sds, SDS{"type": "file"})
+ state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
var file SPFile
if _, err = xdr.Unmarshal(r, &file); err != nil {
- state.ctx.LogE("sp-process", SdsAdd(sds, SDS{
+ state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{
"err": err,
"type": "file",
}), "")
return nil, err
}
- sdsp := SdsAdd(sds, SDS{
- "xx": string(TRx),
- "hash": ToBase32(file.Hash[:]),
- "size": strconv.Itoa(len(file.Payload)),
- })
+ sdsp["xx"] = string(TRx)
+ sdsp["hash"] = ToBase32(file.Hash[:])
+ sdsp["size"] = strconv.Itoa(len(file.Payload))
filePath := filepath.Join(
- state.ctx.Spool,
+ state.Ctx.Spool,
state.Node.Id.String(),
string(TRx),
ToBase32(file.Hash[:]),
)
- state.ctx.LogD("sp-file", sdsp, "opening part")
+ state.Ctx.LogD("sp-file", sdsp, "opening part")
fd, err := os.OpenFile(
filePath+PartSuffix,
os.O_RDWR|os.O_CREATE,
os.FileMode(0600),
)
if err != nil {
- state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
+ state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
return nil, err
}
- state.ctx.LogD(
+ state.Ctx.LogD(
"sp-file",
SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(int64(file.Offset), 10)}),
"seeking",
)
if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
- state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
+ state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
fd.Close()
return nil, err
}
- state.ctx.LogD("sp-file", sdsp, "writing")
+ state.Ctx.LogD("sp-file", sdsp, "writing")
_, err = fd.Write(file.Payload)
if err != nil {
- state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
+ state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
fd.Close()
return nil, err
}
ourSize := uint64(file.Offset) + uint64(len(file.Payload))
+ state.RLock()
sdsp["fullsize"] = strconv.FormatInt(int64(state.infosTheir[*file.Hash].Size), 10)
sdsp["size"] = strconv.FormatInt(int64(ourSize), 10)
- state.ctx.LogP("sp-file", sdsp, "")
+ state.Ctx.LogP("sp-file", sdsp, "")
if state.infosTheir[*file.Hash].Size != ourSize {
+ state.RUnlock()
fd.Close()
continue
}
+ state.RUnlock()
+ spWorkersGroup.Wait()
+ spWorkersGroup.Add(1)
go func() {
if err := fd.Sync(); err != nil {
- state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "sync")
+ state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "sync")
fd.Close()
return
}
state.wg.Add(1)
defer state.wg.Done()
fd.Seek(0, io.SeekStart)
- state.ctx.LogD("sp-file", sdsp, "checking")
+ state.Ctx.LogD("sp-file", sdsp, "checking")
gut, err := Check(fd, file.Hash[:])
fd.Close()
if err != nil || !gut {
- state.ctx.LogE("sp-file", sdsp, "checksum mismatch")
+ state.Ctx.LogE("sp-file", sdsp, "checksum mismatch")
return
}
- state.ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "")
+ state.Ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "")
os.Rename(filePath+PartSuffix, filePath)
+ state.Lock()
+ delete(state.infosTheir, *file.Hash)
+ state.Unlock()
+ spWorkersGroup.Done()
go func() {
state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
}()
}()
case SPTypeDone:
- state.ctx.LogD(
- "sp-process",
- SdsAdd(sds, SDS{"type": "done"}),
- "unmarshaling packet",
- )
+ sdsp := SdsAdd(sds, SDS{"type": "done"})
+ state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
var done SPDone
if _, err = xdr.Unmarshal(r, &done); err != nil {
- state.ctx.LogE("sp-process", SdsAdd(sds, SDS{
+ state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{
"type": "done",
"err": err,
}), "")
return nil, err
}
- sdsp := SdsAdd(sds, SDS{"hash": ToBase32(done.Hash[:])})
- state.ctx.LogD("sp-done", sdsp, "removing")
+ sdsp["hash"] = ToBase32(done.Hash[:])
+ state.Ctx.LogD("sp-done", sdsp, "removing")
err := os.Remove(filepath.Join(
- state.ctx.Spool,
+ state.Ctx.Spool,
state.Node.Id.String(),
string(TTx),
ToBase32(done.Hash[:]),
))
+ sdsp["xx"] = string(TTx)
if err == nil {
- state.ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TTx)}), "")
+ state.Ctx.LogI("sp-done", sdsp, "")
} else {
- state.ctx.LogE("sp-done", SdsAdd(sdsp, SDS{"xx": string(TTx)}), "")
+ state.Ctx.LogE("sp-done", sdsp, "")
}
case SPTypeFreq:
sdsp := SdsAdd(sds, SDS{"type": "freq"})
- state.ctx.LogD("sp-process", sdsp, "unmarshaling packet")
+ state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
var freq SPFreq
if _, err = xdr.Unmarshal(r, &freq); err != nil {
- state.ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "")
+ state.Ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "")
return nil, err
}
- state.ctx.LogD("sp-process", SdsAdd(sdsp, SDS{
- "hash": ToBase32(freq.Hash[:]),
- "offset": strconv.FormatInt(int64(freq.Offset), 10),
- }), "queueing")
- state.Lock()
- state.queueTheir = append(state.queueTheir, &freq)
- state.Unlock()
+ sdsp["hash"] = ToBase32(freq.Hash[:])
+ sdsp["offset"] = strconv.FormatInt(int64(freq.Offset), 10)
+ state.Ctx.LogD("sp-process", sdsp, "queueing")
+ nice, exists := state.infosOurSeen[*freq.Hash]
+ if exists {
+ if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
+ state.Lock()
+ insertIdx := 0
+ var freqWithNice *FreqWithNice
+ for insertIdx, freqWithNice = range state.queueTheir {
+ if freqWithNice.nice > nice {
+ break
+ }
+ }
+ state.queueTheir = append(state.queueTheir, nil)
+ copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
+ state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
+ state.Unlock()
+ } else {
+ state.Ctx.LogD("sp-process", sdsp, "skipping")
+ }
+ } else {
+ state.Ctx.LogD("sp-process", sdsp, "unknown")
+ }
case SPTypeHalt:
- sdsp := SdsAdd(sds, SDS{"type": "halt"})
- state.ctx.LogD("sp-process", sdsp, "")
+ state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "halt"}), "")
state.Lock()
state.queueTheir = nil
state.Unlock()
default:
- state.ctx.LogE(
+ state.Ctx.LogE(
"sp-process",
SdsAdd(sds, SDS{"type": head.Type}),
"unknown",
if infosGot {
var pkts int
var size uint64
+ state.RLock()
for _, info := range state.infosTheir {
pkts++
size += info.Size
}
- state.ctx.LogI("sp-infos", SDS{
+ state.RUnlock()
+ state.Ctx.LogI("sp-infos", SDS{
"xx": string(TRx),
"node": state.Node.Id,
"pkts": strconv.Itoa(pkts),