X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Fcypherpunks.ru%2Fnncp%2Fsp.go;h=f6b07ce5ad7dac0ac1f6d7ab04f28bab4ae9a5b8;hb=7edc3ed722c8d36e4a99b1cf45f209a973165a37;hp=78a0d96d407a01255d22461542f8c6db3a119b49;hpb=39192ab56b0f14679b927d95d3a6c3f15ce77c62;p=nncp.git diff --git a/src/cypherpunks.ru/nncp/sp.go b/src/cypherpunks.ru/nncp/sp.go index 78a0d96..f6b07ce 100644 --- a/src/cypherpunks.ru/nncp/sp.go +++ b/src/cypherpunks.ru/nncp/sp.go @@ -1,6 +1,6 @@ /* NNCP -- Node to Node copy, utilities for store-and-forward data exchange -Copyright (C) 2016-2017 Sergey Matveev +Copyright (C) 2016-2018 Sergey Matveev This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -36,13 +36,13 @@ import ( ) const ( - MaxSPSize = 2<<15 - 256 - PartSuffix = ".part" - DeadlineDuration = 10 + MaxSPSize = 1<<16 - 256 + PartSuffix = ".part" + DefaultDeadline = 10 ) var ( - MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'L', 1, 0, 0} + MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1} SPHeadOverhead int SPInfoOverhead int @@ -55,6 +55,8 @@ var ( noise.CipherChaChaPoly, noise.HashBLAKE2b, ) + + spWorkersGroup sync.WaitGroup ) type SPType uint8 @@ -157,33 +159,43 @@ func payloadsSplit(payloads [][]byte) [][]byte { } type SPState struct { - ctx *Ctx - NodeId *NodeId - nice uint8 - hs *noise.HandshakeState - csOur *noise.CipherState - csTheir *noise.CipherState - payloads chan []byte - infosTheir map[[32]byte]*SPInfo - queueTheir []*SPFreq - wg sync.WaitGroup - RxBytes int64 - RxLastSeen time.Time - TxBytes int64 - TxLastSeen time.Time - started time.Time - Duration time.Duration - RxSpeed int64 - TxSpeed int64 - rxLock *os.File - txLock *os.File - xxOnly *TRxTx + ctx *Ctx + Node *Node + onlineDeadline uint + maxOnlineTime uint + nice uint8 + hs *noise.HandshakeState + csOur *noise.CipherState + csTheir *noise.CipherState + payloads chan []byte + infosTheir map[[32]byte]*SPInfo + infosOurSeen map[[32]byte]struct{} + queueTheir []*SPFreq + wg sync.WaitGroup + RxBytes int64 + RxLastSeen time.Time + TxBytes int64 + TxLastSeen time.Time + started time.Time + Duration time.Duration + RxSpeed int64 + TxSpeed int64 + rxLock *os.File + txLock *os.File + xxOnly TRxTx + isDead bool sync.RWMutex } -func (state *SPState) isDead() bool { +func (state *SPState) NotAlive() bool { + if state.isDead { + return true + } now := time.Now() - return now.Sub(state.RxLastSeen).Seconds() >= DeadlineDuration && now.Sub(state.TxLastSeen).Seconds() >= DeadlineDuration + if state.maxOnlineTime > 0 && state.started.Add(time.Duration(state.maxOnlineTime)*time.Second).Before(now) { + return true + } + return uint(now.Sub(state.RxLastSeen).Seconds()) >= state.onlineDeadline && uint(now.Sub(state.TxLastSeen).Seconds()) >= state.onlineDeadline } func (state *SPState) dirUnlock() { @@ -202,7 +214,7 @@ func (state *SPState) WriteSP(dst io.Writer, payload []byte) error { func (state *SPState) ReadSP(src io.Reader) ([]byte, error) { var sp SPRaw - n, err := xdr.Unmarshal(src, &sp) + n, err := xdr.UnmarshalLimited(src, &sp, 1<<17) if err != nil { return nil, err } @@ -214,7 +226,7 @@ func (state *SPState) ReadSP(src io.Reader) ([]byte, error) { return sp.Payload, nil } -func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8) [][]byte { +func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]struct{}) [][]byte { var infos []*SPInfo var totalSize int64 for job := range ctx.Jobs(nodeId, TTx) { @@ -222,12 +234,16 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8) [][]byte { if job.PktEnc.Nice > nice { continue } + if _, known := (*seen)[*job.HshValue]; known { + continue + } totalSize += job.Size infos = append(infos, &SPInfo{ Nice: job.PktEnc.Nice, Size: uint64(job.Size), Hash: job.HshValue, }) + (*seen)[*job.HshValue] = struct{}{} } sort.Sort(ByNice(infos)) var payloads [][]byte @@ -239,35 +255,38 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8) [][]byte { "size": strconv.FormatInt(int64(info.Size), 10), }, "") } - ctx.LogI("sp-infos", SDS{ - "xx": string(TTx), - "node": nodeId, - "pkts": strconv.Itoa(len(payloads)), - "size": strconv.FormatInt(totalSize, 10), - }, "") + if totalSize > 0 { + ctx.LogI("sp-infos", SDS{ + "xx": string(TTx), + "node": nodeId, + "pkts": strconv.Itoa(len(payloads)), + "size": strconv.FormatInt(totalSize, 10), + }, "") + } return payloadsSplit(payloads) } -func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx) (*SPState, error) { +func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly TRxTx, onlineDeadline, maxOnlineTime uint) (*SPState, error) { err := ctx.ensureRxDir(nodeId) if err != nil { return nil, err } var rxLock *os.File - if xxOnly != nil && *xxOnly == TRx { + if xxOnly == "" || xxOnly == TRx { rxLock, err = ctx.LockDir(nodeId, TRx) if err != nil { return nil, err } } var txLock *os.File - if xxOnly != nil && *xxOnly == TTx { + if xxOnly == "" || xxOnly == TTx { txLock, err = ctx.LockDir(nodeId, TTx) if err != nil { return nil, err } } started := time.Now() + node := ctx.Neigh[*nodeId] conf := noise.Config{ CipherSuite: NoiseCipherSuite, Pattern: noise.HandshakeIK, @@ -276,47 +295,58 @@ func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx) Private: ctx.Self.NoisePrv[:], Public: ctx.Self.NoisePub[:], }, - PeerStatic: ctx.Neigh[*nodeId].NoisePub[:], + PeerStatic: node.NoisePub[:], + } + hs, err := noise.NewHandshakeState(conf) + if err != nil { + return nil, err } state := SPState{ - ctx: ctx, - hs: noise.NewHandshakeState(conf), - NodeId: nodeId, - nice: nice, - payloads: make(chan []byte), - infosTheir: make(map[[32]byte]*SPInfo), - started: started, - rxLock: rxLock, - txLock: txLock, - xxOnly: xxOnly, + ctx: ctx, + hs: hs, + Node: node, + onlineDeadline: onlineDeadline, + maxOnlineTime: maxOnlineTime, + nice: nice, + payloads: make(chan []byte), + infosTheir: make(map[[32]byte]*SPInfo), + infosOurSeen: make(map[[32]byte]struct{}), + started: started, + rxLock: rxLock, + txLock: txLock, + xxOnly: xxOnly, } var infosPayloads [][]byte - if xxOnly == nil || *xxOnly != TTx { - infosPayloads = ctx.infosOur(nodeId, nice) + if xxOnly == "" || xxOnly == TTx { + infosPayloads = ctx.infosOur(nodeId, nice, &state.infosOurSeen) } var firstPayload []byte if len(infosPayloads) > 0 { firstPayload = infosPayloads[0] } - // Pad first payload, to hide actual existing files + // Pad first payload, to hide actual number of existing files for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ { firstPayload = append(firstPayload, SPHaltMarshalized...) } var buf []byte var payload []byte - buf, _, _ = state.hs.WriteMessage(nil, firstPayload) + buf, _, _, err = state.hs.WriteMessage(nil, firstPayload) + if err != nil { + state.dirUnlock() + return nil, err + } sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))} ctx.LogD("sp-start", sds, "sending first message") - conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second)) + conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second)) if err = state.WriteSP(conn, buf); err != nil { ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") state.dirUnlock() return nil, err } ctx.LogD("sp-start", sds, "waiting for first message") - conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second)) + conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second)) if buf, err = state.ReadSP(conn); err != nil { ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") state.dirUnlock() @@ -338,7 +368,7 @@ func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx) return &state, err } -func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, error) { +func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly TRxTx) (*SPState, error) { started := time.Now() conf := noise.Config{ CipherSuite: NoiseCipherSuite, @@ -349,24 +379,28 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, erro Public: ctx.Self.NoisePub[:], }, } + hs, err := noise.NewHandshakeState(conf) + if err != nil { + return nil, err + } state := SPState{ - ctx: ctx, - hs: noise.NewHandshakeState(conf), - nice: nice, - payloads: make(chan []byte), - infosTheir: make(map[[32]byte]*SPInfo), - started: started, - xxOnly: xxOnly, + ctx: ctx, + hs: hs, + nice: nice, + payloads: make(chan []byte), + infosOurSeen: make(map[[32]byte]struct{}), + infosTheir: make(map[[32]byte]*SPInfo), + started: started, + xxOnly: xxOnly, } var buf []byte var payload []byte - var err error ctx.LogD( "sp-start", SDS{"nice": strconv.Itoa(int(nice))}, "waiting for first message", ) - conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second)) + conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second)) if buf, err = state.ReadSP(conn); err != nil { ctx.LogE("sp-start", SDS{"err": err}, "") return nil, err @@ -376,35 +410,36 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, erro return nil, err } - var nodeId *NodeId - for _, node := range ctx.Neigh { + var node *Node + for _, node = range ctx.Neigh { if subtle.ConstantTimeCompare(state.hs.PeerStatic(), node.NoisePub[:]) == 1 { - nodeId = node.Id break } } - if nodeId == nil { + if node == nil { peerId := ToBase32(state.hs.PeerStatic()) ctx.LogE("sp-start", SDS{"peer": peerId}, "unknown") return nil, errors.New("Unknown peer: " + peerId) } - state.NodeId = nodeId - sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))} + state.Node = node + state.onlineDeadline = node.OnlineDeadline + state.maxOnlineTime = node.MaxOnlineTime + sds := SDS{"node": node.Id, "nice": strconv.Itoa(int(nice))} - if ctx.ensureRxDir(nodeId); err != nil { + if ctx.ensureRxDir(node.Id); err != nil { return nil, err } var rxLock *os.File - if xxOnly != nil && *xxOnly == TRx { - rxLock, err = ctx.LockDir(nodeId, TRx) + if xxOnly == "" || xxOnly == TRx { + rxLock, err = ctx.LockDir(node.Id, TRx) if err != nil { return nil, err } } state.rxLock = rxLock var txLock *os.File - if xxOnly != nil && *xxOnly == TTx { - txLock, err = ctx.LockDir(nodeId, TTx) + if xxOnly == "" || xxOnly == TTx { + txLock, err = ctx.LockDir(node.Id, TTx) if err != nil { return nil, err } @@ -412,21 +447,25 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, erro state.txLock = txLock var infosPayloads [][]byte - if xxOnly == nil || *xxOnly != TTx { - infosPayloads = ctx.infosOur(nodeId, nice) + if xxOnly == "" || xxOnly == TTx { + infosPayloads = ctx.infosOur(node.Id, nice, &state.infosOurSeen) } var firstPayload []byte if len(infosPayloads) > 0 { firstPayload = infosPayloads[0] } - // Pad first payload, to hide actual existing files + // Pad first payload, to hide actual number of existing files for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ { firstPayload = append(firstPayload, SPHaltMarshalized...) } ctx.LogD("sp-start", sds, "sending first message") - buf, state.csTheir, state.csOur = state.hs.WriteMessage(nil, firstPayload) - conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second)) + buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload) + if err != nil { + state.dirUnlock() + return nil, err + } + conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second)) if err = state.WriteSP(conn, buf); err != nil { ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") state.dirUnlock() @@ -442,7 +481,7 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, erro } func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payload []byte) error { - sds := SDS{"node": state.NodeId, "nice": strconv.Itoa(int(state.nice))} + sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.nice))} if len(infosPayloads) > 1 { go func() { for _, payload := range infosPayloads[1:] { @@ -465,6 +504,7 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa state.ctx.LogE("sp-work", SdsAdd(sds, SDS{"err": err}), "") return err } + go func() { for _, reply := range replies { state.ctx.LogD( @@ -475,11 +515,34 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa state.payloads <- reply } }() + + if state.xxOnly == "" || state.xxOnly == TTx { + go func() { + for range time.Tick(time.Second) { + for _, payload := range state.ctx.infosOur( + state.Node.Id, + state.nice, + &state.infosOurSeen, + ) { + state.ctx.LogD( + "sp-work", + SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), + "queuing new info", + ) + state.payloads <- payload + } + } + }() + } + state.wg.Add(1) go func() { - defer state.wg.Done() + defer func() { + state.isDead = true + state.wg.Done() + }() for { - if state.isDead() { + if state.NotAlive() { return } var payload []byte @@ -510,7 +573,7 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa state.ctx.LogD("sp-file", sdsp, "queueing") fd, err := os.Open(filepath.Join( state.ctx.Spool, - state.NodeId.String(), + state.Node.Id.String(), string(TTx), ToBase32(freq.Hash[:]), )) @@ -576,32 +639,38 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), "sending", ) - conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second)) + conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil { state.ctx.LogE("sp-xmit", SdsAdd(sds, SDS{"err": err}), "") break } } }() + state.wg.Add(1) go func() { - defer state.wg.Done() + defer func() { + state.isDead = true + state.wg.Done() + }() for { - if state.isDead() { + if state.NotAlive() { return } state.ctx.LogD("sp-recv", sds, "waiting for payload") - conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second)) + conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second)) payload, err := state.ReadSP(conn) if err != nil { unmarshalErr := err.(*xdr.UnmarshalError) netErr, ok := unmarshalErr.Err.(net.Error) - if (ok && netErr.Timeout()) || unmarshalErr.ErrorCode == xdr.ErrIO { + if ok && netErr.Timeout() { continue - } else { - state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "") + } + if unmarshalErr.ErrorCode == xdr.ErrIO { break } + state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "") + break } state.ctx.LogD( "sp-recv", @@ -635,6 +704,7 @@ func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payloa }() } }() + return nil } @@ -655,7 +725,7 @@ func (state *SPState) Wait() { } func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { - sds := SDS{"node": state.NodeId, "nice": strconv.Itoa(int(state.nice))} + sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.nice))} r := bytes.NewReader(payload) var err error var replies [][]byte @@ -686,29 +756,30 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { continue } state.ctx.LogD("sp-process", sdsp, "received") - if state.xxOnly != nil && *state.xxOnly == TTx { + if state.xxOnly == TTx { continue } state.Lock() state.infosTheir[*info.Hash] = &info state.Unlock() state.ctx.LogD("sp-process", sdsp, "stating part") - if _, err = os.Stat(filepath.Join( + pktPath := filepath.Join( state.ctx.Spool, - state.NodeId.String(), + state.Node.Id.String(), string(TRx), ToBase32(info.Hash[:]), - )); err == nil { + ) + if _, err = os.Stat(pktPath); err == nil { state.ctx.LogD("sp-process", sdsp, "already done") replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash})) continue } - fi, err := os.Stat(filepath.Join( - state.ctx.Spool, - state.NodeId.String(), - string(TRx), - ToBase32(info.Hash[:])+PartSuffix, - )) + if _, err = os.Stat(pktPath + SeenSuffix); err == nil { + state.ctx.LogD("sp-process", sdsp, "already seen") + replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash})) + continue + } + fi, err := os.Stat(pktPath + PartSuffix) var offset int64 if err == nil { offset = fi.Size() @@ -743,7 +814,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { }) filePath := filepath.Join( state.ctx.Spool, - state.NodeId.String(), + state.Node.Id.String(), string(TRx), ToBase32(file.Hash[:]), ) @@ -775,13 +846,18 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { return nil, err } ourSize := uint64(file.Offset) + uint64(len(file.Payload)) + state.RLock() sdsp["fullsize"] = strconv.FormatInt(int64(state.infosTheir[*file.Hash].Size), 10) sdsp["size"] = strconv.FormatInt(int64(ourSize), 10) state.ctx.LogP("sp-file", sdsp, "") if state.infosTheir[*file.Hash].Size != ourSize { + state.RUnlock() fd.Close() continue } + state.RUnlock() + spWorkersGroup.Wait() + spWorkersGroup.Add(1) go func() { if err := fd.Sync(); err != nil { state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "sync") @@ -800,6 +876,10 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } state.ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "") os.Rename(filePath+PartSuffix, filePath) + state.Lock() + delete(state.infosTheir, *file.Hash) + state.Unlock() + spWorkersGroup.Done() go func() { state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash}) }() @@ -822,7 +902,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.ctx.LogD("sp-done", sdsp, "removing") err := os.Remove(filepath.Join( state.ctx.Spool, - state.NodeId.String(), + state.Node.Id.String(), string(TTx), ToBase32(done.Hash[:]), )) @@ -864,13 +944,15 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { if infosGot { var pkts int var size uint64 + state.RLock() for _, info := range state.infosTheir { pkts++ size += info.Size } + state.RUnlock() state.ctx.LogI("sp-infos", SDS{ "xx": string(TRx), - "node": state.NodeId, + "node": state.Node.Id, "pkts": strconv.Itoa(pkts), "size": strconv.FormatInt(int64(size), 10), }, "")