)
const (
- MaxLLPSize = 2<<15 - 256
+ MaxSPSize = 2<<15 - 256
PartSuffix = ".part"
DeadlineDuration = 10
)
var (
MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'L', 1, 0, 0}
- LLPHeadOverhead int
- LLPInfoOverhead int
- LLPFreqOverhead int
- LLPFileOverhead int
- LLPHaltMarshalized []byte
+ SPHeadOverhead int
+ SPInfoOverhead int
+ SPFreqOverhead int
+ SPFileOverhead int
+ SPHaltMarshalized []byte
NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
noise.DH25519,
)
)
-type LLPType uint8
+type SPType uint8
const (
- LLPTypeInfo LLPType = iota
- LLPTypeFreq LLPType = iota
- LLPTypeFile LLPType = iota
- LLPTypeDone LLPType = iota
- LLPTypeHalt LLPType = iota
+ SPTypeInfo SPType = iota
+ SPTypeFreq SPType = iota
+ SPTypeFile SPType = iota
+ SPTypeDone SPType = iota
+ SPTypeHalt SPType = iota
)
-type LLPHead struct {
- Type LLPType
+type SPHead struct {
+ Type SPType
}
-type LLPInfo struct {
+type SPInfo struct {
Nice uint8
Size uint64
Hash *[32]byte
}
-type LLPFreq struct {
+type SPFreq struct {
Hash *[32]byte
Offset uint64
}
-type LLPFile struct {
+type SPFile struct {
Hash *[32]byte
Offset uint64
Payload []byte
}
-type LLPDone struct {
+type SPDone struct {
Hash *[32]byte
}
-type LLPRaw struct {
+type SPRaw struct {
Magic [8]byte
Payload []byte
}
func init() {
var buf bytes.Buffer
- llpHead := LLPHead{Type: LLPTypeHalt}
- if _, err := xdr.Marshal(&buf, llpHead); err != nil {
+ spHead := SPHead{Type: SPTypeHalt}
+ if _, err := xdr.Marshal(&buf, spHead); err != nil {
panic(err)
}
- copy(LLPHaltMarshalized, buf.Bytes())
- LLPHeadOverhead = buf.Len()
+ copy(SPHaltMarshalized, buf.Bytes())
+ SPHeadOverhead = buf.Len()
buf.Reset()
- llpInfo := LLPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
- if _, err := xdr.Marshal(&buf, llpInfo); err != nil {
+ spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
+ if _, err := xdr.Marshal(&buf, spInfo); err != nil {
panic(err)
}
- LLPInfoOverhead = buf.Len()
+ SPInfoOverhead = buf.Len()
buf.Reset()
- llpFreq := LLPFreq{Hash: new([32]byte), Offset: 123}
- if _, err := xdr.Marshal(&buf, llpFreq); err != nil {
+ spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
+ if _, err := xdr.Marshal(&buf, spFreq); err != nil {
panic(err)
}
- LLPFreqOverhead = buf.Len()
+ SPFreqOverhead = buf.Len()
buf.Reset()
- llpFile := LLPFile{Hash: new([32]byte), Offset: 123}
- if _, err := xdr.Marshal(&buf, llpFile); err != nil {
+ spFile := SPFile{Hash: new([32]byte), Offset: 123}
+ if _, err := xdr.Marshal(&buf, spFile); err != nil {
panic(err)
}
- LLPFileOverhead = buf.Len()
+ SPFileOverhead = buf.Len()
}
-func MarshalLLP(typ LLPType, llp interface{}) []byte {
+func MarshalSP(typ SPType, sp interface{}) []byte {
var buf bytes.Buffer
var err error
- if _, err = xdr.Marshal(&buf, LLPHead{typ}); err != nil {
+ if _, err = xdr.Marshal(&buf, SPHead{typ}); err != nil {
panic(err)
}
- if _, err = xdr.Marshal(&buf, llp); err != nil {
+ if _, err = xdr.Marshal(&buf, sp); err != nil {
panic(err)
}
return buf.Bytes()
func payloadsSplit(payloads [][]byte) [][]byte {
var outbounds [][]byte
- outbound := make([]byte, 0, MaxLLPSize)
+ outbound := make([]byte, 0, MaxSPSize)
for i, payload := range payloads {
outbound = append(outbound, payload...)
- if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxLLPSize {
+ if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
outbounds = append(outbounds, outbound)
- outbound = make([]byte, 0, MaxLLPSize)
+ outbound = make([]byte, 0, MaxSPSize)
}
}
if len(outbound) > 0 {
return outbounds
}
-type LLPState struct {
+type SPState struct {
ctx *Ctx
NodeId *NodeId
nice uint8
csOur *noise.CipherState
csTheir *noise.CipherState
payloads chan []byte
- infosTheir map[[32]byte]*LLPInfo
- queueTheir []*LLPFreq
+ infosTheir map[[32]byte]*SPInfo
+ queueTheir []*SPFreq
wg sync.WaitGroup
RxBytes int64
RxLastSeen time.Time
sync.RWMutex
}
-func (state *LLPState) isDead() bool {
+func (state *SPState) isDead() bool {
now := time.Now()
return now.Sub(state.RxLastSeen).Seconds() >= DeadlineDuration && now.Sub(state.TxLastSeen).Seconds() >= DeadlineDuration
}
-func (state *LLPState) dirUnlock() {
+func (state *SPState) dirUnlock() {
state.ctx.UnlockDir(state.rxLock)
state.ctx.UnlockDir(state.txLock)
}
-func (state *LLPState) WriteLLP(dst io.Writer, payload []byte) error {
- n, err := xdr.Marshal(dst, LLPRaw{Magic: MagicNNCPLv1, Payload: payload})
+func (state *SPState) WriteSP(dst io.Writer, payload []byte) error {
+ n, err := xdr.Marshal(dst, SPRaw{Magic: MagicNNCPLv1, Payload: payload})
if err == nil {
state.TxLastSeen = time.Now()
state.TxBytes += int64(n)
return err
}
-func (state *LLPState) ReadLLP(src io.Reader) ([]byte, error) {
- var llp LLPRaw
- n, err := xdr.Unmarshal(src, &llp)
+func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
+ var sp SPRaw
+ n, err := xdr.Unmarshal(src, &sp)
if err != nil {
return nil, err
}
state.RxLastSeen = time.Now()
state.RxBytes += int64(n)
- if llp.Magic != MagicNNCPLv1 {
+ if sp.Magic != MagicNNCPLv1 {
return nil, BadMagic
}
- return llp.Payload, nil
+ return sp.Payload, nil
}
func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8) [][]byte {
- var infos []*LLPInfo
+ var infos []*SPInfo
var totalSize int64
for job := range ctx.Jobs(nodeId, TTx) {
job.Fd.Close()
continue
}
totalSize += job.Size
- infos = append(infos, &LLPInfo{
+ infos = append(infos, &SPInfo{
Nice: job.PktEnc.Nice,
Size: uint64(job.Size),
Hash: job.HshValue,
sort.Sort(ByNice(infos))
var payloads [][]byte
for _, info := range infos {
- payloads = append(payloads, MarshalLLP(LLPTypeInfo, info))
- ctx.LogD("llp-info-our", SDS{
+ payloads = append(payloads, MarshalSP(SPTypeInfo, info))
+ ctx.LogD("sp-info-our", SDS{
"node": nodeId,
"name": ToBase32(info.Hash[:]),
"size": strconv.FormatInt(int64(info.Size), 10),
}, "")
}
- ctx.LogI("llp-infos", SDS{
+ ctx.LogI("sp-infos", SDS{
"xx": string(TTx),
"node": nodeId,
"pkts": strconv.Itoa(len(payloads)),
return payloadsSplit(payloads)
}
-func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx) (*LLPState, error) {
+func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx) (*SPState, error) {
err := ctx.ensureRxDir(nodeId)
if err != nil {
return nil, err
},
PeerStatic: ctx.Neigh[*nodeId].NoisePub[:],
}
- state := LLPState{
+ state := SPState{
ctx: ctx,
hs: noise.NewHandshakeState(conf),
NodeId: nodeId,
nice: nice,
payloads: make(chan []byte),
- infosTheir: make(map[[32]byte]*LLPInfo),
+ infosTheir: make(map[[32]byte]*SPInfo),
started: started,
rxLock: rxLock,
txLock: txLock,
firstPayload = infosPayloads[0]
}
// Pad first payload, to hide actual existing files
- for i := 0; i < (MaxLLPSize-len(firstPayload))/LLPHeadOverhead; i++ {
- firstPayload = append(firstPayload, LLPHaltMarshalized...)
+ for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
+ firstPayload = append(firstPayload, SPHaltMarshalized...)
}
var buf []byte
var payload []byte
buf, _, _ = state.hs.WriteMessage(nil, firstPayload)
sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))}
- ctx.LogD("llp-start", sds, "sending first message")
+ ctx.LogD("sp-start", sds, "sending first message")
conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second))
- if err = state.WriteLLP(conn, buf); err != nil {
- ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "")
+ if err = state.WriteSP(conn, buf); err != nil {
+ ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
state.dirUnlock()
return nil, err
}
- ctx.LogD("llp-start", sds, "waiting for first message")
+ ctx.LogD("sp-start", sds, "waiting for first message")
conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second))
- if buf, err = state.ReadLLP(conn); err != nil {
- ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "")
+ if buf, err = state.ReadSP(conn); err != nil {
+ ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
state.dirUnlock()
return nil, err
}
payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
if err != nil {
- ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "")
+ ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
state.dirUnlock()
return nil, err
}
- ctx.LogD("llp-start", sds, "starting workers")
+ ctx.LogD("sp-start", sds, "starting workers")
err = state.StartWorkers(conn, infosPayloads, payload)
if err != nil {
- ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "")
+ ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
state.dirUnlock()
return nil, err
}
return &state, err
}
-func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*LLPState, error) {
+func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, error) {
started := time.Now()
conf := noise.Config{
CipherSuite: NoiseCipherSuite,
Public: ctx.Self.NoisePub[:],
},
}
- state := LLPState{
+ state := SPState{
ctx: ctx,
hs: noise.NewHandshakeState(conf),
nice: nice,
payloads: make(chan []byte),
- infosTheir: make(map[[32]byte]*LLPInfo),
+ infosTheir: make(map[[32]byte]*SPInfo),
started: started,
xxOnly: xxOnly,
}
var payload []byte
var err error
ctx.LogD(
- "llp-start",
+ "sp-start",
SDS{"nice": strconv.Itoa(int(nice))},
"waiting for first message",
)
conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second))
- if buf, err = state.ReadLLP(conn); err != nil {
- ctx.LogE("llp-start", SDS{"err": err}, "")
+ if buf, err = state.ReadSP(conn); err != nil {
+ ctx.LogE("sp-start", SDS{"err": err}, "")
return nil, err
}
if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
- ctx.LogE("llp-start", SDS{"err": err}, "")
+ ctx.LogE("sp-start", SDS{"err": err}, "")
return nil, err
}
}
if nodeId == nil {
peerId := ToBase32(state.hs.PeerStatic())
- ctx.LogE("llp-start", SDS{"peer": peerId}, "unknown")
+ ctx.LogE("sp-start", SDS{"peer": peerId}, "unknown")
return nil, errors.New("Unknown peer: " + peerId)
}
state.NodeId = nodeId
firstPayload = infosPayloads[0]
}
// Pad first payload, to hide actual existing files
- for i := 0; i < (MaxLLPSize-len(firstPayload))/LLPHeadOverhead; i++ {
- firstPayload = append(firstPayload, LLPHaltMarshalized...)
+ for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
+ firstPayload = append(firstPayload, SPHaltMarshalized...)
}
- ctx.LogD("llp-start", sds, "sending first message")
+ ctx.LogD("sp-start", sds, "sending first message")
buf, state.csTheir, state.csOur = state.hs.WriteMessage(nil, firstPayload)
conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second))
- if err = state.WriteLLP(conn, buf); err != nil {
- ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "")
+ if err = state.WriteSP(conn, buf); err != nil {
+ ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
state.dirUnlock()
return nil, err
}
- ctx.LogD("llp-start", sds, "starting workers")
+ ctx.LogD("sp-start", sds, "starting workers")
err = state.StartWorkers(conn, infosPayloads, payload)
if err != nil {
state.dirUnlock()
return &state, err
}
-func (state *LLPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payload []byte) error {
+func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payload []byte) error {
sds := SDS{"node": state.NodeId, "nice": strconv.Itoa(int(state.nice))}
if len(infosPayloads) > 1 {
go func() {
for _, payload := range infosPayloads[1:] {
state.ctx.LogD(
- "llp-work",
+ "sp-work",
SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
"queuing remaining payload",
)
}()
}
state.ctx.LogD(
- "llp-work",
+ "sp-work",
SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
"processing first payload",
)
- replies, err := state.ProcessLLP(payload)
+ replies, err := state.ProcessSP(payload)
if err != nil {
- state.ctx.LogE("llp-work", SdsAdd(sds, SDS{"err": err}), "")
+ state.ctx.LogE("sp-work", SdsAdd(sds, SDS{"err": err}), "")
return err
}
go func() {
for _, reply := range replies {
state.ctx.LogD(
- "llp-work",
+ "sp-work",
SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}),
"queuing reply",
)
select {
case payload = <-state.payloads:
state.ctx.LogD(
- "llp-xmit",
+ "sp-xmit",
SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
"got payload",
)
if payload == nil {
state.RLock()
if len(state.queueTheir) == 0 {
- state.ctx.LogD("llp-xmit", sds, "file queue is empty")
+ state.ctx.LogD("sp-xmit", sds, "file queue is empty")
state.RUnlock()
time.Sleep(100 * time.Millisecond)
continue
"hash": ToBase32(freq.Hash[:]),
"size": strconv.FormatInt(int64(freq.Offset), 10),
})
- state.ctx.LogD("llp-file", sdsp, "queueing")
+ state.ctx.LogD("sp-file", sdsp, "queueing")
fd, err := os.Open(filepath.Join(
state.ctx.Spool,
state.NodeId.String(),
ToBase32(freq.Hash[:]),
))
if err != nil {
- state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "")
+ state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
break
}
fi, err := fd.Stat()
if err != nil {
- state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "")
+ state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
break
}
fullSize := uint64(fi.Size())
var buf []byte
if freq.Offset < fullSize {
- state.ctx.LogD("llp-file", sdsp, "seeking")
+ state.ctx.LogD("sp-file", sdsp, "seeking")
if _, err = fd.Seek(int64(freq.Offset), 0); err != nil {
- state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "")
+ state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
break
}
- buf = make([]byte, MaxLLPSize-LLPHeadOverhead-LLPFileOverhead)
+ buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
n, err := fd.Read(buf)
if err != nil {
- state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "")
+ state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
break
}
buf = buf[:n]
state.ctx.LogD(
- "llp-file",
+ "sp-file",
SdsAdd(sdsp, SDS{"size": strconv.Itoa(n)}),
"read",
)
}
fd.Close()
- payload = MarshalLLP(LLPTypeFile, LLPFile{
+ payload = MarshalSP(SPTypeFile, SPFile{
Hash: freq.Hash,
Offset: freq.Offset,
Payload: buf,
ourSize := freq.Offset + uint64(len(buf))
sdsp["size"] = strconv.FormatInt(int64(ourSize), 10)
sdsp["fullsize"] = strconv.FormatInt(int64(fullSize), 10)
- state.ctx.LogP("llp-file", sdsp, "")
+ state.ctx.LogP("sp-file", sdsp, "")
state.Lock()
if len(state.queueTheir) > 0 && *state.queueTheir[0].Hash == *freq.Hash {
if ourSize == fullSize {
- state.ctx.LogD("llp-file", sdsp, "finished")
+ state.ctx.LogD("sp-file", sdsp, "finished")
if len(state.queueTheir) > 1 {
state.queueTheir = state.queueTheir[1:]
} else {
state.queueTheir[0].Offset += uint64(len(buf))
}
} else {
- state.ctx.LogD("llp-file", sdsp, "queue disappeared")
+ state.ctx.LogD("sp-file", sdsp, "queue disappeared")
}
state.Unlock()
}
state.ctx.LogD(
- "llp-xmit",
+ "sp-xmit",
SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
"sending",
)
conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second))
- if err := state.WriteLLP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil {
- state.ctx.LogE("llp-xmit", SdsAdd(sds, SDS{"err": err}), "")
+ if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil {
+ state.ctx.LogE("sp-xmit", SdsAdd(sds, SDS{"err": err}), "")
break
}
}
if state.isDead() {
return
}
- state.ctx.LogD("llp-recv", sds, "waiting for payload")
+ state.ctx.LogD("sp-recv", sds, "waiting for payload")
conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second))
- payload, err := state.ReadLLP(conn)
+ payload, err := state.ReadSP(conn)
if err != nil {
unmarshalErr := err.(*xdr.UnmarshalError)
netErr, ok := unmarshalErr.Err.(net.Error)
if (ok && netErr.Timeout()) || unmarshalErr.ErrorCode == xdr.ErrIO {
continue
} else {
- state.ctx.LogE("llp-recv", SdsAdd(sds, SDS{"err": err}), "")
+ state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
break
}
}
state.ctx.LogD(
- "llp-recv",
+ "sp-recv",
SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
"got payload",
)
payload, err = state.csTheir.Decrypt(nil, nil, payload)
if err != nil {
- state.ctx.LogE("llp-recv", SdsAdd(sds, SDS{"err": err}), "")
+ state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
break
}
state.ctx.LogD(
- "llp-recv",
+ "sp-recv",
SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
"processing",
)
- replies, err := state.ProcessLLP(payload)
+ replies, err := state.ProcessSP(payload)
if err != nil {
- state.ctx.LogE("llp-recv", SdsAdd(sds, SDS{"err": err}), "")
+ state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
break
}
go func() {
for _, reply := range replies {
state.ctx.LogD(
- "llp-recv",
+ "sp-recv",
SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}),
"queuing reply",
)
return nil
}
-func (state *LLPState) Wait() {
+func (state *SPState) Wait() {
state.wg.Wait()
state.dirUnlock()
state.Duration = time.Now().Sub(state.started)
}
}
-func (state *LLPState) ProcessLLP(payload []byte) ([][]byte, error) {
+func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
sds := SDS{"node": state.NodeId, "nice": strconv.Itoa(int(state.nice))}
r := bytes.NewReader(payload)
var err error
var replies [][]byte
var infosGot bool
for r.Len() > 0 {
- state.ctx.LogD("llp-process", sds, "unmarshaling header")
- var head LLPHead
+ state.ctx.LogD("sp-process", sds, "unmarshaling header")
+ var head SPHead
if _, err = xdr.Unmarshal(r, &head); err != nil {
- state.ctx.LogE("llp-process", SdsAdd(sds, SDS{"err": err}), "")
+ state.ctx.LogE("sp-process", SdsAdd(sds, SDS{"err": err}), "")
return nil, err
}
switch head.Type {
- case LLPTypeInfo:
+ case SPTypeInfo:
infosGot = true
sdsp := SdsAdd(sds, SDS{"type": "info"})
- state.ctx.LogD("llp-process", sdsp, "unmarshaling packet")
- var info LLPInfo
+ state.ctx.LogD("sp-process", sdsp, "unmarshaling packet")
+ var info SPInfo
if _, err = xdr.Unmarshal(r, &info); err != nil {
- state.ctx.LogE("llp-process", SdsAdd(sdsp, SDS{"err": err}), "")
+ state.ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "")
return nil, err
}
sdsp = SdsAdd(sds, SDS{
"size": strconv.FormatInt(int64(info.Size), 10),
})
if info.Nice > state.nice {
- state.ctx.LogD("llp-process", sdsp, "too nice")
+ state.ctx.LogD("sp-process", sdsp, "too nice")
continue
}
- state.ctx.LogD("llp-process", sdsp, "received")
+ state.ctx.LogD("sp-process", sdsp, "received")
if state.xxOnly != nil && *state.xxOnly == TTx {
continue
}
state.Lock()
state.infosTheir[*info.Hash] = &info
state.Unlock()
- state.ctx.LogD("llp-process", sdsp, "stating part")
+ state.ctx.LogD("sp-process", sdsp, "stating part")
if _, err = os.Stat(filepath.Join(
state.ctx.Spool,
state.NodeId.String(),
string(TRx),
ToBase32(info.Hash[:]),
)); err == nil {
- state.ctx.LogD("llp-process", sdsp, "already done")
- replies = append(replies, MarshalLLP(LLPTypeDone, LLPDone{info.Hash}))
+ state.ctx.LogD("sp-process", sdsp, "already done")
+ replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
continue
}
fi, err := os.Stat(filepath.Join(
if err == nil {
offset = fi.Size()
state.ctx.LogD(
- "llp-process",
+ "sp-process",
SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(offset, 10)}),
"part exists",
)
}
- replies = append(replies, MarshalLLP(
- LLPTypeFreq,
- LLPFreq{info.Hash, uint64(offset)},
+ replies = append(replies, MarshalSP(
+ SPTypeFreq,
+ SPFreq{info.Hash, uint64(offset)},
))
- case LLPTypeFile:
+ case SPTypeFile:
state.ctx.LogD(
- "llp-process",
+ "sp-process",
SdsAdd(sds, SDS{"type": "file"}),
"unmarshaling packet",
)
- var file LLPFile
+ var file SPFile
if _, err = xdr.Unmarshal(r, &file); err != nil {
- state.ctx.LogE("llp-process", SdsAdd(sds, SDS{
+ state.ctx.LogE("sp-process", SdsAdd(sds, SDS{
"err": err,
"type": "file",
}), "")
string(TRx),
ToBase32(file.Hash[:]),
)
- state.ctx.LogD("llp-file", sdsp, "opening part")
+ state.ctx.LogD("sp-file", sdsp, "opening part")
fd, err := os.OpenFile(
filePath+PartSuffix,
os.O_RDWR|os.O_CREATE,
os.FileMode(0600),
)
if err != nil {
- state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "")
+ state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
return nil, err
}
state.ctx.LogD(
- "llp-file",
+ "sp-file",
SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(int64(file.Offset), 10)}),
"seeking",
)
if _, err = fd.Seek(int64(file.Offset), 0); err != nil {
- state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "")
+ state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
fd.Close()
return nil, err
}
- state.ctx.LogD("llp-file", sdsp, "writing")
+ state.ctx.LogD("sp-file", sdsp, "writing")
_, err = fd.Write(file.Payload)
if err != nil {
- state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "")
+ state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
fd.Close()
return nil, err
}
ourSize := uint64(file.Offset) + uint64(len(file.Payload))
sdsp["fullsize"] = strconv.FormatInt(int64(state.infosTheir[*file.Hash].Size), 10)
sdsp["size"] = strconv.FormatInt(int64(ourSize), 10)
- state.ctx.LogP("llp-file", sdsp, "")
+ state.ctx.LogP("sp-file", sdsp, "")
if state.infosTheir[*file.Hash].Size != ourSize {
fd.Close()
continue
}
go func() {
if err := fd.Sync(); err != nil {
- state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "sync")
+ state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "sync")
fd.Close()
return
}
state.wg.Add(1)
defer state.wg.Done()
fd.Seek(0, 0)
- state.ctx.LogD("llp-file", sdsp, "checking")
+ state.ctx.LogD("sp-file", sdsp, "checking")
gut, err := Check(fd, file.Hash[:])
fd.Close()
if err != nil || !gut {
- state.ctx.LogE("llp-file", sdsp, "checksum mismatch")
+ state.ctx.LogE("sp-file", sdsp, "checksum mismatch")
return
}
- state.ctx.LogI("llp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "")
+ state.ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "")
os.Rename(filePath+PartSuffix, filePath)
- state.payloads <- MarshalLLP(LLPTypeDone, LLPDone{file.Hash})
+ state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
}()
- case LLPTypeDone:
+ case SPTypeDone:
state.ctx.LogD(
- "llp-process",
+ "sp-process",
SdsAdd(sds, SDS{"type": "done"}),
"unmarshaling packet",
)
- var done LLPDone
+ var done SPDone
if _, err = xdr.Unmarshal(r, &done); err != nil {
- state.ctx.LogE("llp-process", SdsAdd(sds, SDS{
+ state.ctx.LogE("sp-process", SdsAdd(sds, SDS{
"type": "done",
"err": err,
}), "")
return nil, err
}
sdsp := SdsAdd(sds, SDS{"hash": ToBase32(done.Hash[:])})
- state.ctx.LogD("llp-done", sdsp, "removing")
+ state.ctx.LogD("sp-done", sdsp, "removing")
err := os.Remove(filepath.Join(
state.ctx.Spool,
state.NodeId.String(),
ToBase32(done.Hash[:]),
))
if err == nil {
- state.ctx.LogI("llp-done", SdsAdd(sdsp, SDS{"xx": string(TTx)}), "")
+ state.ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TTx)}), "")
} else {
- state.ctx.LogE("llp-done", SdsAdd(sdsp, SDS{"xx": string(TTx)}), "")
+ state.ctx.LogE("sp-done", SdsAdd(sdsp, SDS{"xx": string(TTx)}), "")
}
- case LLPTypeFreq:
+ case SPTypeFreq:
sdsp := SdsAdd(sds, SDS{"type": "freq"})
- state.ctx.LogD("llp-process", sdsp, "unmarshaling packet")
- var freq LLPFreq
+ state.ctx.LogD("sp-process", sdsp, "unmarshaling packet")
+ var freq SPFreq
if _, err = xdr.Unmarshal(r, &freq); err != nil {
- state.ctx.LogE("llp-process", SdsAdd(sdsp, SDS{"err": err}), "")
+ state.ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "")
return nil, err
}
- state.ctx.LogD("llp-process", SdsAdd(sdsp, SDS{
+ state.ctx.LogD("sp-process", SdsAdd(sdsp, SDS{
"hash": ToBase32(freq.Hash[:]),
"offset": strconv.FormatInt(int64(freq.Offset), 10),
}), "queueing")
state.Lock()
state.queueTheir = append(state.queueTheir, &freq)
state.Unlock()
- case LLPTypeHalt:
+ case SPTypeHalt:
sdsp := SdsAdd(sds, SDS{"type": "halt"})
- state.ctx.LogD("llp-process", sdsp, "")
+ state.ctx.LogD("sp-process", sdsp, "")
state.Lock()
state.queueTheir = nil
state.Unlock()
default:
state.ctx.LogE(
- "llp-process",
+ "sp-process",
SdsAdd(sds, SDS{"type": head.Type}),
"unknown",
)
pkts++
size += info.Size
}
- state.ctx.LogI("llp-infos", SDS{
+ state.ctx.LogI("sp-infos", SDS{
"xx": string(TRx),
"node": state.NodeId,
"pkts": strconv.Itoa(pkts),