X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Fcypherpunks.ru%2Fnncp%2Fsp.go;h=f6b07ce5ad7dac0ac1f6d7ab04f28bab4ae9a5b8;hb=7edc3ed722c8d36e4a99b1cf45f209a973165a37;hp=3200ea45f90d1917cba1077fb0c1912f63b28285;hpb=4238707a98d54b74e9a1732e6266f2d523c65f4c;p=nncp.git diff --git a/src/cypherpunks.ru/nncp/sp.go b/src/cypherpunks.ru/nncp/sp.go index 3200ea4..f6b07ce 100644 --- a/src/cypherpunks.ru/nncp/sp.go +++ b/src/cypherpunks.ru/nncp/sp.go @@ -1,6 +1,6 @@ /* NNCP -- Node to Node copy, utilities for store-and-forward data exchange -Copyright (C) 2016-2017 Sergey Matveev +Copyright (C) 2016-2018 Sergey Matveev This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -36,7 +36,7 @@ import ( ) const ( - MaxSPSize = 2<<16 - 256 + MaxSPSize = 1<<16 - 256 PartSuffix = ".part" DefaultDeadline = 10 ) @@ -55,6 +55,8 @@ var ( noise.CipherChaChaPoly, noise.HashBLAKE2b, ) + + spWorkersGroup sync.WaitGroup ) type SPType uint8 @@ -157,35 +159,43 @@ func payloadsSplit(payloads [][]byte) [][]byte { } type SPState struct { - ctx *Ctx - Node *Node - nice uint8 - hs *noise.HandshakeState - csOur *noise.CipherState - csTheir *noise.CipherState - payloads chan []byte - infosTheir map[[32]byte]*SPInfo - infosOurSeen map[[32]byte]struct{} - queueTheir []*SPFreq - wg sync.WaitGroup - RxBytes int64 - RxLastSeen time.Time - TxBytes int64 - TxLastSeen time.Time - started time.Time - Duration time.Duration - RxSpeed int64 - TxSpeed int64 - rxLock *os.File - txLock *os.File - xxOnly *TRxTx - isDead bool + ctx *Ctx + Node *Node + onlineDeadline uint + maxOnlineTime uint + nice uint8 + hs *noise.HandshakeState + csOur *noise.CipherState + csTheir *noise.CipherState + payloads chan []byte + infosTheir map[[32]byte]*SPInfo + infosOurSeen map[[32]byte]struct{} + queueTheir []*SPFreq + wg sync.WaitGroup + RxBytes int64 + RxLastSeen time.Time + TxBytes int64 + TxLastSeen time.Time + started time.Time + Duration time.Duration + RxSpeed int64 + TxSpeed int64 + rxLock *os.File + txLock *os.File + xxOnly TRxTx + isDead bool sync.RWMutex } func (state *SPState) NotAlive() bool { + if state.isDead { + return true + } now := time.Now() - return state.isDead || (int(now.Sub(state.RxLastSeen).Seconds()) >= state.Node.OnlineDeadline && int(now.Sub(state.TxLastSeen).Seconds()) >= state.Node.OnlineDeadline) + if state.maxOnlineTime > 0 && state.started.Add(time.Duration(state.maxOnlineTime)*time.Second).Before(now) { + return true + } + return uint(now.Sub(state.RxLastSeen).Seconds()) >= state.onlineDeadline && uint(now.Sub(state.TxLastSeen).Seconds()) >= state.onlineDeadline } func (state *SPState) dirUnlock() { @@ -204,7 +214,7 @@ func (state *SPState) WriteSP(dst io.Writer, payload []byte) error { func (state *SPState) ReadSP(src io.Reader) ([]byte, error) { var sp SPRaw - n, err := xdr.UnmarshalLimited(src, &sp, 2<<17) + n, err := xdr.UnmarshalLimited(src, &sp, 1<<17) if err != nil { return nil, err } @@ -256,20 +266,20 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]struct{} return payloadsSplit(payloads) } -func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx) (*SPState, error) { +func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly TRxTx, onlineDeadline, maxOnlineTime uint) (*SPState, error) { err := ctx.ensureRxDir(nodeId) if err != nil { return nil, err } var rxLock *os.File - if xxOnly != nil && *xxOnly == TRx { + if xxOnly == "" || xxOnly == TRx { rxLock, err = ctx.LockDir(nodeId, TRx) if err != nil { return nil, err } } var txLock *os.File - if xxOnly != nil && *xxOnly == TTx { + if xxOnly == "" || xxOnly == TTx { txLock, err = ctx.LockDir(nodeId, TTx) if err != nil { return nil, err @@ -287,22 +297,28 @@ func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx) }, PeerStatic: node.NoisePub[:], } + hs, err := noise.NewHandshakeState(conf) + if err != nil { + return nil, err + } state := SPState{ - ctx: ctx, - hs: noise.NewHandshakeState(conf), - Node: node, - nice: nice, - payloads: make(chan []byte), - infosTheir: make(map[[32]byte]*SPInfo), - infosOurSeen: make(map[[32]byte]struct{}), - started: started, - rxLock: rxLock, - txLock: txLock, - xxOnly: xxOnly, + ctx: ctx, + hs: hs, + Node: node, + onlineDeadline: onlineDeadline, + maxOnlineTime: maxOnlineTime, + nice: nice, + payloads: make(chan []byte), + infosTheir: make(map[[32]byte]*SPInfo), + infosOurSeen: make(map[[32]byte]struct{}), + started: started, + rxLock: rxLock, + txLock: txLock, + xxOnly: xxOnly, } var infosPayloads [][]byte - if xxOnly == nil || *xxOnly != TTx { + if xxOnly == "" || xxOnly == TTx { infosPayloads = ctx.infosOur(nodeId, nice, &state.infosOurSeen) } var firstPayload []byte @@ -316,7 +332,11 @@ func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx) var buf []byte var payload []byte - buf, _, _ = state.hs.WriteMessage(nil, firstPayload) + buf, _, _, err = state.hs.WriteMessage(nil, firstPayload) + if err != nil { + state.dirUnlock() + return nil, err + } sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))} ctx.LogD("sp-start", sds, "sending first message") conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second)) @@ -348,7 +368,7 @@ func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx) return &state, err } -func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, error) { +func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly TRxTx) (*SPState, error) { started := time.Now() conf := noise.Config{ CipherSuite: NoiseCipherSuite, @@ -359,9 +379,13 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, erro Public: ctx.Self.NoisePub[:], }, } + hs, err := noise.NewHandshakeState(conf) + if err != nil { + return nil, err + } state := SPState{ ctx: ctx, - hs: noise.NewHandshakeState(conf), + hs: hs, nice: nice, payloads: make(chan []byte), infosOurSeen: make(map[[32]byte]struct{}), @@ -371,7 +395,6 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, erro } var buf []byte var payload []byte - var err error ctx.LogD( "sp-start", SDS{"nice": strconv.Itoa(int(nice))}, @@ -399,13 +422,15 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, erro return nil, errors.New("Unknown peer: " + peerId) } state.Node = node + state.onlineDeadline = node.OnlineDeadline + state.maxOnlineTime = node.MaxOnlineTime sds := SDS{"node": node.Id, "nice": strconv.Itoa(int(nice))} if ctx.ensureRxDir(node.Id); err != nil { return nil, err } var rxLock *os.File - if xxOnly != nil && *xxOnly == TRx { + if xxOnly == "" || xxOnly == TRx { rxLock, err = ctx.LockDir(node.Id, TRx) if err != nil { return nil, err @@ -413,7 +438,7 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, erro } state.rxLock = rxLock var txLock *os.File - if xxOnly != nil && *xxOnly == TTx { + if xxOnly == "" || xxOnly == TTx { txLock, err = ctx.LockDir(node.Id, TTx) if err != nil { return nil, err @@ -422,7 +447,7 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, erro state.txLock = txLock var infosPayloads [][]byte - if xxOnly == nil || *xxOnly != TTx { + if xxOnly == "" || xxOnly == TTx { infosPayloads = ctx.infosOur(node.Id, nice, &state.infosOurSeen) } var firstPayload []byte @@ -435,7 +460,11 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, erro } ctx.LogD("sp-start", sds, "sending first message") - buf, state.csTheir, state.csOur = state.hs.WriteMessage(nil, firstPayload) + buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload) + if err != nil { + state.dirUnlock() + return nil, err + } conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second)) if err = state.WriteSP(conn, buf); err != nil { ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") @@ -487,22 +516,24 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa } }() - go func() { - for range time.Tick(time.Second) { - for _, payload := range state.ctx.infosOur( - state.Node.Id, - state.nice, - &state.infosOurSeen, - ) { - state.ctx.LogD( - "sp-work", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), - "queuing new info", - ) - state.payloads <- payload + if state.xxOnly == "" || state.xxOnly == TTx { + go func() { + for range time.Tick(time.Second) { + for _, payload := range state.ctx.infosOur( + state.Node.Id, + state.nice, + &state.infosOurSeen, + ) { + state.ctx.LogD( + "sp-work", + SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), + "queuing new info", + ) + state.payloads <- payload + } } - } - }() + }() + } state.wg.Add(1) go func() { @@ -559,7 +590,7 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa var buf []byte if freq.Offset < fullSize { state.ctx.LogD("sp-file", sdsp, "seeking") - if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil { + if _, err = fd.Seek(int64(freq.Offset), 0); err != nil { state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") break } @@ -725,29 +756,30 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { continue } state.ctx.LogD("sp-process", sdsp, "received") - if state.xxOnly != nil && *state.xxOnly == TTx { + if state.xxOnly == TTx { continue } state.Lock() state.infosTheir[*info.Hash] = &info state.Unlock() state.ctx.LogD("sp-process", sdsp, "stating part") - if _, err = os.Stat(filepath.Join( + pktPath := filepath.Join( state.ctx.Spool, state.Node.Id.String(), string(TRx), ToBase32(info.Hash[:]), - )); err == nil { + ) + if _, err = os.Stat(pktPath); err == nil { state.ctx.LogD("sp-process", sdsp, "already done") replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash})) continue } - fi, err := os.Stat(filepath.Join( - state.ctx.Spool, - state.Node.Id.String(), - string(TRx), - ToBase32(info.Hash[:])+PartSuffix, - )) + if _, err = os.Stat(pktPath + SeenSuffix); err == nil { + state.ctx.LogD("sp-process", sdsp, "already seen") + replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash})) + continue + } + fi, err := os.Stat(pktPath + PartSuffix) var offset int64 if err == nil { offset = fi.Size() @@ -801,7 +833,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(int64(file.Offset), 10)}), "seeking", ) - if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil { + if _, err = fd.Seek(int64(file.Offset), 0); err != nil { state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") fd.Close() return nil, err @@ -814,13 +846,18 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { return nil, err } ourSize := uint64(file.Offset) + uint64(len(file.Payload)) + state.RLock() sdsp["fullsize"] = strconv.FormatInt(int64(state.infosTheir[*file.Hash].Size), 10) sdsp["size"] = strconv.FormatInt(int64(ourSize), 10) state.ctx.LogP("sp-file", sdsp, "") if state.infosTheir[*file.Hash].Size != ourSize { + state.RUnlock() fd.Close() continue } + state.RUnlock() + spWorkersGroup.Wait() + spWorkersGroup.Add(1) go func() { if err := fd.Sync(); err != nil { state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "sync") @@ -829,7 +866,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } state.wg.Add(1) defer state.wg.Done() - fd.Seek(0, io.SeekStart) + fd.Seek(0, 0) state.ctx.LogD("sp-file", sdsp, "checking") gut, err := Check(fd, file.Hash[:]) fd.Close() @@ -839,6 +876,10 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } state.ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "") os.Rename(filePath+PartSuffix, filePath) + state.Lock() + delete(state.infosTheir, *file.Hash) + state.Unlock() + spWorkersGroup.Done() go func() { state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash}) }() @@ -903,10 +944,12 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { if infosGot { var pkts int var size uint64 + state.RLock() for _, info := range state.infosTheir { pkts++ size += info.Size } + state.RUnlock() state.ctx.LogI("sp-infos", SDS{ "xx": string(TRx), "node": state.Node.Id,