X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Fcypherpunks.ru%2Fnncp%2Fsp.go;h=f6b07ce5ad7dac0ac1f6d7ab04f28bab4ae9a5b8;hb=7edc3ed722c8d36e4a99b1cf45f209a973165a37;hp=8b64be1a68e24505513fa4d6476a895a525df180;hpb=8c96237da230cf7306d2cc460164879d6d1bbce4;p=nncp.git diff --git a/src/cypherpunks.ru/nncp/sp.go b/src/cypherpunks.ru/nncp/sp.go index 8b64be1..f6b07ce 100644 --- a/src/cypherpunks.ru/nncp/sp.go +++ b/src/cypherpunks.ru/nncp/sp.go @@ -1,6 +1,6 @@ /* NNCP -- Node to Node copy, utilities for store-and-forward data exchange -Copyright (C) 2016-2017 Sergey Matveev +Copyright (C) 2016-2018 Sergey Matveev This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -55,6 +55,8 @@ var ( noise.CipherChaChaPoly, noise.HashBLAKE2b, ) + + spWorkersGroup sync.WaitGroup ) type SPType uint8 @@ -180,7 +182,7 @@ type SPState struct { TxSpeed int64 rxLock *os.File txLock *os.File - xxOnly *TRxTx + xxOnly TRxTx isDead bool sync.RWMutex } @@ -264,20 +266,20 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]struct{} return payloadsSplit(payloads) } -func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx, onlineDeadline, maxOnlineTime uint) (*SPState, error) { +func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly TRxTx, onlineDeadline, maxOnlineTime uint) (*SPState, error) { err := ctx.ensureRxDir(nodeId) if err != nil { return nil, err } var rxLock *os.File - if xxOnly != nil && *xxOnly == TRx { + if xxOnly == "" || xxOnly == TRx { rxLock, err = ctx.LockDir(nodeId, TRx) if err != nil { return nil, err } } var txLock *os.File - if xxOnly != nil && *xxOnly == TTx { + if xxOnly == "" || xxOnly == TTx { txLock, err = ctx.LockDir(nodeId, TTx) if err != nil { return nil, err @@ -295,9 +297,13 @@ func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx, }, PeerStatic: node.NoisePub[:], } + hs, err := noise.NewHandshakeState(conf) + if err != nil { + return nil, err + } state := SPState{ ctx: ctx, - hs: noise.NewHandshakeState(conf), + hs: hs, Node: node, onlineDeadline: onlineDeadline, maxOnlineTime: maxOnlineTime, @@ -312,7 +318,7 @@ func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx, } var infosPayloads [][]byte - if xxOnly == nil || *xxOnly != TTx { + if xxOnly == "" || xxOnly == TTx { infosPayloads = ctx.infosOur(nodeId, nice, &state.infosOurSeen) } var firstPayload []byte @@ -326,7 +332,11 @@ func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx, var buf []byte var payload []byte - buf, _, _ = state.hs.WriteMessage(nil, firstPayload) + buf, _, _, err = state.hs.WriteMessage(nil, firstPayload) + if err != nil { + state.dirUnlock() + return nil, err + } sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))} ctx.LogD("sp-start", sds, "sending first message") conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second)) @@ -358,7 +368,7 @@ func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx, return &state, err } -func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, error) { +func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly TRxTx) (*SPState, error) { started := time.Now() conf := noise.Config{ CipherSuite: NoiseCipherSuite, @@ -369,9 +379,13 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, erro Public: ctx.Self.NoisePub[:], }, } + hs, err := noise.NewHandshakeState(conf) + if err != nil { + return nil, err + } state := SPState{ ctx: ctx, - hs: noise.NewHandshakeState(conf), + hs: hs, nice: nice, payloads: make(chan []byte), infosOurSeen: make(map[[32]byte]struct{}), @@ -381,7 +395,6 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, erro } var buf []byte var payload []byte - var err error ctx.LogD( "sp-start", SDS{"nice": strconv.Itoa(int(nice))}, @@ -417,7 +430,7 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, erro return nil, err } var rxLock *os.File - if xxOnly != nil && *xxOnly == TRx { + if xxOnly == "" || xxOnly == TRx { rxLock, err = ctx.LockDir(node.Id, TRx) if err != nil { return nil, err @@ -425,7 +438,7 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, erro } state.rxLock = rxLock var txLock *os.File - if xxOnly != nil && *xxOnly == TTx { + if xxOnly == "" || xxOnly == TTx { txLock, err = ctx.LockDir(node.Id, TTx) if err != nil { return nil, err @@ -434,7 +447,7 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, erro state.txLock = txLock var infosPayloads [][]byte - if xxOnly == nil || *xxOnly != TTx { + if xxOnly == "" || xxOnly == TTx { infosPayloads = ctx.infosOur(node.Id, nice, &state.infosOurSeen) } var firstPayload []byte @@ -447,7 +460,11 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, erro } ctx.LogD("sp-start", sds, "sending first message") - buf, state.csTheir, state.csOur = state.hs.WriteMessage(nil, firstPayload) + buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload) + if err != nil { + state.dirUnlock() + return nil, err + } conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second)) if err = state.WriteSP(conn, buf); err != nil { ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") @@ -499,22 +516,24 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa } }() - go func() { - for range time.Tick(time.Second) { - for _, payload := range state.ctx.infosOur( - state.Node.Id, - state.nice, - &state.infosOurSeen, - ) { - state.ctx.LogD( - "sp-work", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), - "queuing new info", - ) - state.payloads <- payload + if state.xxOnly == "" || state.xxOnly == TTx { + go func() { + for range time.Tick(time.Second) { + for _, payload := range state.ctx.infosOur( + state.Node.Id, + state.nice, + &state.infosOurSeen, + ) { + state.ctx.LogD( + "sp-work", + SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), + "queuing new info", + ) + state.payloads <- payload + } } - } - }() + }() + } state.wg.Add(1) go func() { @@ -737,29 +756,30 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { continue } state.ctx.LogD("sp-process", sdsp, "received") - if state.xxOnly != nil && *state.xxOnly == TTx { + if state.xxOnly == TTx { continue } state.Lock() state.infosTheir[*info.Hash] = &info state.Unlock() state.ctx.LogD("sp-process", sdsp, "stating part") - if _, err = os.Stat(filepath.Join( + pktPath := filepath.Join( state.ctx.Spool, state.Node.Id.String(), string(TRx), ToBase32(info.Hash[:]), - )); err == nil { + ) + if _, err = os.Stat(pktPath); err == nil { state.ctx.LogD("sp-process", sdsp, "already done") replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash})) continue } - fi, err := os.Stat(filepath.Join( - state.ctx.Spool, - state.Node.Id.String(), - string(TRx), - ToBase32(info.Hash[:])+PartSuffix, - )) + if _, err = os.Stat(pktPath + SeenSuffix); err == nil { + state.ctx.LogD("sp-process", sdsp, "already seen") + replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash})) + continue + } + fi, err := os.Stat(pktPath + PartSuffix) var offset int64 if err == nil { offset = fi.Size() @@ -836,6 +856,8 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { continue } state.RUnlock() + spWorkersGroup.Wait() + spWorkersGroup.Add(1) go func() { if err := fd.Sync(); err != nil { state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "sync") @@ -857,6 +879,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.Lock() delete(state.infosTheir, *file.Hash) state.Unlock() + spWorkersGroup.Done() go func() { state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash}) }()