]> Cypherpunks.ru repositories - nncp.git/commitdiff
Wait for transfer to complete
authorSergey Matveev <stargrave@stargrave.org>
Sat, 7 Jan 2017 12:12:51 +0000 (15:12 +0300)
committerSergey Matveev <stargrave@stargrave.org>
Sat, 7 Jan 2017 12:12:51 +0000 (15:12 +0300)
src/cypherpunks.ru/nncp/llp.go

index 26e982d0433a57fb25d87ad618df0718c1fe00d0..6b91f2c04426a76420cf8d2a36b4ca53b0382f5f 100644 (file)
@@ -36,8 +36,9 @@ import (
 )
 
 const (
-       MaxLLPSize = 2<<15 - 256
-       PartSuffix = ".part"
+       MaxLLPSize       = 2<<15 - 256
+       PartSuffix       = ".part"
+       DeadlineDuration = 10
 )
 
 var (
@@ -54,8 +55,6 @@ var (
                noise.CipherChaChaPoly,
                noise.HashBLAKE2b,
        )
-
-       DeadlineDuration time.Duration = 10 * time.Second
 )
 
 type LLPType uint8
@@ -167,7 +166,6 @@ type LLPState struct {
        payloads   chan []byte
        infosTheir map[[32]byte]*LLPInfo
        queueTheir []*LLPFreq
-       isDead     bool
        wg         sync.WaitGroup
        RxBytes    int64
        RxLastSeen time.Time
@@ -183,6 +181,11 @@ type LLPState struct {
        sync.RWMutex
 }
 
+func (state *LLPState) isDead() bool {
+       now := time.Now()
+       return now.Sub(state.RxLastSeen).Seconds() >= DeadlineDuration && now.Sub(state.TxLastSeen).Seconds() >= DeadlineDuration
+}
+
 func (state *LLPState) dirUnlock() {
        state.ctx.UnlockDir(state.rxLock)
        state.ctx.UnlockDir(state.txLock)
@@ -320,14 +323,14 @@ func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx)
        buf, _, _ = state.hs.WriteMessage(nil, firstPayload)
        sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))}
        ctx.LogD("llp-start", sds, "sending first message")
-       conn.SetWriteDeadline(time.Now().Add(DeadlineDuration))
+       conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second))
        if err = state.WriteLLP(conn, buf); err != nil {
                ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "")
                state.dirUnlock()
                return nil, err
        }
        ctx.LogD("llp-start", sds, "waiting for first message")
-       conn.SetReadDeadline(time.Now().Add(DeadlineDuration))
+       conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second))
        if buf, err = state.ReadLLP(conn); err != nil {
                ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "")
                state.dirUnlock()
@@ -377,7 +380,7 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*LLPState, err
                SDS{"nice": strconv.Itoa(int(nice))},
                "waiting for first message",
        )
-       conn.SetReadDeadline(time.Now().Add(DeadlineDuration))
+       conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second))
        if buf, err = state.ReadLLP(conn); err != nil {
                ctx.LogE("llp-start", SDS{"err": err}, "")
                return nil, err
@@ -436,7 +439,7 @@ func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*LLPState, err
 
        ctx.LogD("llp-start", sds, "sending first message")
        buf, state.csTheir, state.csOur = state.hs.WriteMessage(nil, firstPayload)
-       conn.SetWriteDeadline(time.Now().Add(DeadlineDuration))
+       conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second))
        if err = state.WriteLLP(conn, buf); err != nil {
                ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "")
                state.dirUnlock()
@@ -489,7 +492,7 @@ func (state *LLPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, paylo
        go func() {
                defer state.wg.Done()
                for {
-                       if state.isDead {
+                       if state.isDead() {
                                return
                        }
                        var payload []byte
@@ -586,31 +589,32 @@ func (state *LLPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, paylo
                                SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
                                "sending",
                        )
-                       conn.SetWriteDeadline(time.Now().Add(DeadlineDuration))
+                       conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second))
                        if err := state.WriteLLP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil {
                                state.ctx.LogE("llp-xmit", SdsAdd(sds, SDS{"err": err}), "")
                                break
                        }
                }
-               state.isDead = true
        }()
        state.wg.Add(1)
        go func() {
                defer state.wg.Done()
                for {
-                       if state.isDead {
+                       if state.isDead() {
                                return
                        }
                        state.ctx.LogD("llp-recv", sds, "waiting for payload")
-                       conn.SetReadDeadline(time.Now().Add(DeadlineDuration))
+                       conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second))
                        payload, err := state.ReadLLP(conn)
                        if err != nil {
                                unmarshalErr := err.(*xdr.UnmarshalError)
                                netErr, ok := unmarshalErr.Err.(net.Error)
-                               if !((ok && netErr.Timeout()) || unmarshalErr.ErrorCode == xdr.ErrIO) {
+                               if (ok && netErr.Timeout()) || unmarshalErr.ErrorCode == xdr.ErrIO {
+                                       continue
+                               } else {
                                        state.ctx.LogE("llp-recv", SdsAdd(sds, SDS{"err": err}), "")
+                                       break
                                }
-                               break
                        }
                        state.ctx.LogD(
                                "llp-recv",
@@ -643,7 +647,6 @@ func (state *LLPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, paylo
                                }
                        }()
                }
-               state.isDead = true
        }()
        return nil
 }
@@ -703,6 +706,16 @@ func (state *LLPState) ProcessLLP(payload []byte) ([][]byte, error) {
                        state.infosTheir[*info.Hash] = &info
                        state.Unlock()
                        state.ctx.LogD("llp-process", sdsp, "stating part")
+                       if _, err = os.Stat(filepath.Join(
+                               state.ctx.Spool,
+                               state.NodeId.String(),
+                               string(TRx),
+                               ToBase32(info.Hash[:]),
+                       )); err == nil {
+                               state.ctx.LogD("llp-process", sdsp, "already done")
+                               replies = append(replies, MarshalLLP(LLPTypeDone, LLPDone{info.Hash}))
+                               continue
+                       }
                        fi, err := os.Stat(filepath.Join(
                                state.ctx.Spool,
                                state.NodeId.String(),
@@ -788,6 +801,8 @@ func (state *LLPState) ProcessLLP(payload []byte) ([][]byte, error) {
                                        fd.Close()
                                        return
                                }
+                               state.wg.Add(1)
+                               defer state.wg.Done()
                                fd.Seek(0, 0)
                                state.ctx.LogD("llp-file", sdsp, "checking")
                                gut, err := Check(fd, file.Hash[:])