]> Cypherpunks.ru repositories - nncp.git/blobdiff - src/sp.go
Bad exit code if queues are not empty
[nncp.git] / src / sp.go
index 1f2e91504dfd7d71c15cb86787fc68afeafb52fb..eca9544945b1e0d734d6b21d692cce85dc68f372 100644 (file)
--- a/src/sp.go
+++ b/src/sp.go
@@ -1,6 +1,6 @@
 /*
 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
-Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
+Copyright (C) 2016-2022 Sergey Matveev <stargrave@stargrave.org>
 
 This program is free software: you can redistribute it and/or modify
 it under the terms of the GNU General Public License as published by
@@ -27,6 +27,7 @@ import (
        "os"
        "path/filepath"
        "sort"
+       "strconv"
        "sync"
        "time"
 
@@ -39,6 +40,7 @@ const (
        MaxSPSize      = 1<<16 - 256
        PartSuffix     = ".part"
        SPHeadOverhead = 4
+       CfgDeadline    = "NNCPDEADLINE"
 )
 
 type MTHAndOffset struct {
@@ -132,6 +134,14 @@ type ConnDeadlined interface {
 }
 
 func init() {
+       if v := os.Getenv(CfgDeadline); v != "" {
+               i, err := strconv.Atoi(v)
+               if err != nil {
+                       log.Fatalln("Can not convert", CfgDeadline, "to integer:", err)
+               }
+               DefaultDeadline = time.Duration(i) * time.Second
+       }
+
        var buf bytes.Buffer
        spHead := SPHead{Type: SPTypeHalt}
        if _, err := xdr.Marshal(&buf, spHead); err != nil {
@@ -1104,7 +1114,7 @@ func (state *SPState) StartWorkers(
        return nil
 }
 
-func (state *SPState) Wait() {
+func (state *SPState) Wait() bool {
        state.wg.Wait()
        close(state.payloads)
        close(state.pings)
@@ -1120,12 +1130,15 @@ func (state *SPState) Wait() {
        if txDuration > 0 {
                state.TxSpeed = state.TxBytes / txDuration
        }
+       nothingLeft := len(state.queueTheir) == 0
        for _, s := range state.fds {
+               nothingLeft = false
                s.fd.Close()
        }
        for pktName := range state.progressBars {
                ProgressKill(pktName)
        }
+       return nothingLeft
 }
 
 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
@@ -1263,7 +1276,10 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                }
                                continue
                        }
-                       if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
+                       if _, err = os.Stat(filepath.Join(
+                               state.Ctx.Spool, state.Node.Id.String(), string(TRx),
+                               SeenDir, Base32Codec.EncodeToString(info.Hash[:]),
+                       )); err == nil {
                                state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string {
                                        return logMsg(les) + ": already seen"
                                })
@@ -1448,13 +1464,15 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                        humanize.IBytes(uint64(fullsize)),
                                )
                        }
-                       err = fd.Sync()
-                       if err != nil {
-                               state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string {
-                                       return logMsg(les) + ": syncing"
-                               })
-                               state.closeFd(filePathPart)
-                               continue
+                       if !NoSync {
+                               err = fd.Sync()
+                               if err != nil {
+                                       state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string {
+                                               return logMsg(les) + ": syncing"
+                                       })
+                                       state.closeFd(filePathPart)
+                                       continue
+                               }
                        }
                        if hasherAndOffset != nil {
                                delete(state.fileHashers, filePath)
@@ -1584,7 +1602,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                        return fmt.Sprintf("Packet %s is sent", pktName)
                                })
                                if state.Ctx.HdrUsage {
-                                       os.Remove(pth + HdrSuffix)
+                                       os.Remove(JobPath2Hdr(pth))
                                }
                        } else {
                                state.Ctx.LogE("sp-done", lesp, err, logMsg)