]> Cypherpunks.ru repositories - nncp.git/blobdiff - src/cypherpunks.ru/nncp/sp.go
SP checks .seen existence
[nncp.git] / src / cypherpunks.ru / nncp / sp.go
index be845109752a79b2393dfdb8be20b716c5584621..f6b07ce5ad7dac0ac1f6d7ab04f28bab4ae9a5b8 100644 (file)
@@ -1,6 +1,6 @@
 /*
 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
-Copyright (C) 2016-2017 Sergey Matveev <stargrave@stargrave.org>
+Copyright (C) 2016-2018 Sergey Matveev <stargrave@stargrave.org>
 
 This program is free software: you can redistribute it and/or modify
 it under the terms of the GNU General Public License as published by
@@ -55,6 +55,8 @@ var (
                noise.CipherChaChaPoly,
                noise.HashBLAKE2b,
        )
+
+       spWorkersGroup sync.WaitGroup
 )
 
 type SPType uint8
@@ -295,9 +297,13 @@ func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly TRxTx,
                },
                PeerStatic: node.NoisePub[:],
        }
+       hs, err := noise.NewHandshakeState(conf)
+       if err != nil {
+               return nil, err
+       }
        state := SPState{
                ctx:            ctx,
-               hs:             noise.NewHandshakeState(conf),
+               hs:             hs,
                Node:           node,
                onlineDeadline: onlineDeadline,
                maxOnlineTime:  maxOnlineTime,
@@ -326,7 +332,11 @@ func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly TRxTx,
 
        var buf []byte
        var payload []byte
-       buf, _, _ = state.hs.WriteMessage(nil, firstPayload)
+       buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
+       if err != nil {
+               state.dirUnlock()
+               return nil, err
+       }
        sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))}
        ctx.LogD("sp-start", sds, "sending first message")
        conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
@@ -369,9 +379,13 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly TRxTx) (*SPState, error
                        Public:  ctx.Self.NoisePub[:],
                },
        }
+       hs, err := noise.NewHandshakeState(conf)
+       if err != nil {
+               return nil, err
+       }
        state := SPState{
                ctx:          ctx,
-               hs:           noise.NewHandshakeState(conf),
+               hs:           hs,
                nice:         nice,
                payloads:     make(chan []byte),
                infosOurSeen: make(map[[32]byte]struct{}),
@@ -381,7 +395,6 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly TRxTx) (*SPState, error
        }
        var buf []byte
        var payload []byte
-       var err error
        ctx.LogD(
                "sp-start",
                SDS{"nice": strconv.Itoa(int(nice))},
@@ -447,7 +460,11 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly TRxTx) (*SPState, error
        }
 
        ctx.LogD("sp-start", sds, "sending first message")
-       buf, state.csTheir, state.csOur = state.hs.WriteMessage(nil, firstPayload)
+       buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
+       if err != nil {
+               state.dirUnlock()
+               return nil, err
+       }
        conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
        if err = state.WriteSP(conn, buf); err != nil {
                ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
@@ -746,22 +763,23 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        state.infosTheir[*info.Hash] = &info
                        state.Unlock()
                        state.ctx.LogD("sp-process", sdsp, "stating part")
-                       if _, err = os.Stat(filepath.Join(
+                       pktPath := filepath.Join(
                                state.ctx.Spool,
                                state.Node.Id.String(),
                                string(TRx),
                                ToBase32(info.Hash[:]),
-                       )); err == nil {
+                       )
+                       if _, err = os.Stat(pktPath); err == nil {
                                state.ctx.LogD("sp-process", sdsp, "already done")
                                replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
                                continue
                        }
-                       fi, err := os.Stat(filepath.Join(
-                               state.ctx.Spool,
-                               state.Node.Id.String(),
-                               string(TRx),
-                               ToBase32(info.Hash[:])+PartSuffix,
-                       ))
+                       if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
+                               state.ctx.LogD("sp-process", sdsp, "already seen")
+                               replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
+                               continue
+                       }
+                       fi, err := os.Stat(pktPath + PartSuffix)
                        var offset int64
                        if err == nil {
                                offset = fi.Size()
@@ -838,6 +856,8 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                continue
                        }
                        state.RUnlock()
+                       spWorkersGroup.Wait()
+                       spWorkersGroup.Add(1)
                        go func() {
                                if err := fd.Sync(); err != nil {
                                        state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "sync")
@@ -859,6 +879,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                state.Lock()
                                delete(state.infosTheir, *file.Hash)
                                state.Unlock()
+                               spWorkersGroup.Done()
                                go func() {
                                        state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
                                }()