/*
NNCP -- Node to Node copy, utilities for store-and-forward data exchange
-Copyright (C) 2016-2017 Sergey Matveev <stargrave@stargrave.org>
+Copyright (C) 2016-2018 Sergey Matveev <stargrave@stargrave.org>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
noise.CipherChaChaPoly,
noise.HashBLAKE2b,
)
+
+ spWorkersGroup sync.WaitGroup
)
type SPType uint8
TxSpeed int64
rxLock *os.File
txLock *os.File
- xxOnly *TRxTx
+ xxOnly TRxTx
isDead bool
sync.RWMutex
}
return payloadsSplit(payloads)
}
-func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx, onlineDeadline, maxOnlineTime uint) (*SPState, error) {
+func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly TRxTx, onlineDeadline, maxOnlineTime uint) (*SPState, error) {
err := ctx.ensureRxDir(nodeId)
if err != nil {
return nil, err
}
var rxLock *os.File
- if xxOnly != nil && *xxOnly == TRx {
+ if xxOnly == "" || xxOnly == TRx {
rxLock, err = ctx.LockDir(nodeId, TRx)
if err != nil {
return nil, err
}
}
var txLock *os.File
- if xxOnly != nil && *xxOnly == TTx {
+ if xxOnly == "" || xxOnly == TTx {
txLock, err = ctx.LockDir(nodeId, TTx)
if err != nil {
return nil, err
},
PeerStatic: node.NoisePub[:],
}
+ hs, err := noise.NewHandshakeState(conf)
+ if err != nil {
+ return nil, err
+ }
state := SPState{
ctx: ctx,
- hs: noise.NewHandshakeState(conf),
+ hs: hs,
Node: node,
onlineDeadline: onlineDeadline,
maxOnlineTime: maxOnlineTime,
}
var infosPayloads [][]byte
- if xxOnly == nil || *xxOnly != TTx {
+ if xxOnly == "" || xxOnly == TTx {
infosPayloads = ctx.infosOur(nodeId, nice, &state.infosOurSeen)
}
var firstPayload []byte
var buf []byte
var payload []byte
- buf, _, _ = state.hs.WriteMessage(nil, firstPayload)
+ buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
+ if err != nil {
+ state.dirUnlock()
+ return nil, err
+ }
sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))}
ctx.LogD("sp-start", sds, "sending first message")
conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
return &state, err
}
-func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, error) {
+func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly TRxTx) (*SPState, error) {
started := time.Now()
conf := noise.Config{
CipherSuite: NoiseCipherSuite,
Public: ctx.Self.NoisePub[:],
},
}
+ hs, err := noise.NewHandshakeState(conf)
+ if err != nil {
+ return nil, err
+ }
state := SPState{
ctx: ctx,
- hs: noise.NewHandshakeState(conf),
+ hs: hs,
nice: nice,
payloads: make(chan []byte),
infosOurSeen: make(map[[32]byte]struct{}),
}
var buf []byte
var payload []byte
- var err error
ctx.LogD(
"sp-start",
SDS{"nice": strconv.Itoa(int(nice))},
return nil, err
}
var rxLock *os.File
- if xxOnly != nil && *xxOnly == TRx {
+ if xxOnly == "" || xxOnly == TRx {
rxLock, err = ctx.LockDir(node.Id, TRx)
if err != nil {
return nil, err
}
state.rxLock = rxLock
var txLock *os.File
- if xxOnly != nil && *xxOnly == TTx {
+ if xxOnly == "" || xxOnly == TTx {
txLock, err = ctx.LockDir(node.Id, TTx)
if err != nil {
return nil, err
state.txLock = txLock
var infosPayloads [][]byte
- if xxOnly == nil || *xxOnly != TTx {
+ if xxOnly == "" || xxOnly == TTx {
infosPayloads = ctx.infosOur(node.Id, nice, &state.infosOurSeen)
}
var firstPayload []byte
}
ctx.LogD("sp-start", sds, "sending first message")
- buf, state.csTheir, state.csOur = state.hs.WriteMessage(nil, firstPayload)
+ buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
+ if err != nil {
+ state.dirUnlock()
+ return nil, err
+ }
conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
if err = state.WriteSP(conn, buf); err != nil {
ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
}
}()
- go func() {
- for range time.Tick(time.Second) {
- for _, payload := range state.ctx.infosOur(
- state.Node.Id,
- state.nice,
- &state.infosOurSeen,
- ) {
- state.ctx.LogD(
- "sp-work",
- SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
- "queuing new info",
- )
- state.payloads <- payload
+ if state.xxOnly == "" || state.xxOnly == TTx {
+ go func() {
+ for range time.Tick(time.Second) {
+ for _, payload := range state.ctx.infosOur(
+ state.Node.Id,
+ state.nice,
+ &state.infosOurSeen,
+ ) {
+ state.ctx.LogD(
+ "sp-work",
+ SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
+ "queuing new info",
+ )
+ state.payloads <- payload
+ }
}
- }
- }()
+ }()
+ }
state.wg.Add(1)
go func() {
continue
}
state.ctx.LogD("sp-process", sdsp, "received")
- if state.xxOnly != nil && *state.xxOnly == TTx {
+ if state.xxOnly == TTx {
continue
}
state.Lock()
state.infosTheir[*info.Hash] = &info
state.Unlock()
state.ctx.LogD("sp-process", sdsp, "stating part")
- if _, err = os.Stat(filepath.Join(
+ pktPath := filepath.Join(
state.ctx.Spool,
state.Node.Id.String(),
string(TRx),
ToBase32(info.Hash[:]),
- )); err == nil {
+ )
+ if _, err = os.Stat(pktPath); err == nil {
state.ctx.LogD("sp-process", sdsp, "already done")
replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
continue
}
- fi, err := os.Stat(filepath.Join(
- state.ctx.Spool,
- state.Node.Id.String(),
- string(TRx),
- ToBase32(info.Hash[:])+PartSuffix,
- ))
+ if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
+ state.ctx.LogD("sp-process", sdsp, "already seen")
+ replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
+ continue
+ }
+ fi, err := os.Stat(pktPath + PartSuffix)
var offset int64
if err == nil {
offset = fi.Size()
continue
}
state.RUnlock()
+ spWorkersGroup.Wait()
+ spWorkersGroup.Add(1)
go func() {
if err := fd.Sync(); err != nil {
state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "sync")
state.Lock()
delete(state.infosTheir, *file.Hash)
state.Unlock()
+ spWorkersGroup.Done()
go func() {
state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
}()