/*
NNCP -- Node to Node copy, utilities for store-and-forward data exchange
-Copyright (C) 2016-2022 Sergey Matveev <stargrave@stargrave.org>
+Copyright (C) 2016-2023 Sergey Matveev <stargrave@stargrave.org>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
"os"
"path/filepath"
"sort"
+ "strconv"
"sync"
"time"
MaxSPSize = 1<<16 - 256
PartSuffix = ".part"
SPHeadOverhead = 4
+ CfgDeadline = "NNCPDEADLINE"
)
type MTHAndOffset struct {
}
func init() {
+ if v := os.Getenv(CfgDeadline); v != "" {
+ i, err := strconv.Atoi(v)
+ if err != nil {
+ log.Fatalln("Can not convert", CfgDeadline, "to integer:", err)
+ }
+ DefaultDeadline = time.Duration(i) * time.Second
+ }
+
var buf bytes.Buffer
spHead := SPHead{Type: SPTypeHalt}
if _, err := xdr.Marshal(&buf, spHead); err != nil {
func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
state.writeSPBuf.Reset()
- n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
+ if _, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
Magic: MagicNNCPSv1.B,
Payload: payload,
- })
+ }); err != nil {
+ return err
+ }
+ n, err := dst.Write(state.writeSPBuf.Bytes())
if err != nil {
return err
}
- if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
- state.TxLastSeen = time.Now()
- state.TxBytes += int64(n)
- if !ping {
- state.TxLastNonPing = state.TxLastSeen
- }
+ state.TxLastSeen = time.Now()
+ state.TxBytes += int64(n)
+ if !ping {
+ state.TxLastNonPing = state.TxLastSeen
}
- return err
+ return nil
}
func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
return nil
}
-func (state *SPState) Wait() {
+func (state *SPState) Wait() bool {
state.wg.Wait()
close(state.payloads)
close(state.pings)
- state.Duration = time.Now().Sub(state.started)
+ state.Duration = time.Since(state.started)
state.dirUnlock()
state.RxSpeed = state.RxBytes
state.TxSpeed = state.TxBytes
if txDuration > 0 {
state.TxSpeed = state.TxBytes / txDuration
}
+ nothingLeft := len(state.queueTheir) == 0
for _, s := range state.fds {
+ nothingLeft = false
s.fd.Close()
}
for pktName := range state.progressBars {
ProgressKill(pktName)
}
+ return nothingLeft
}
func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
humanize.IBytes(uint64(fullsize)),
)
}
- err = fd.Sync()
- if err != nil {
- state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string {
- return logMsg(les) + ": syncing"
- })
- state.closeFd(filePathPart)
- continue
+ if !NoSync {
+ err = fd.Sync()
+ if err != nil {
+ state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string {
+ return logMsg(les) + ": syncing"
+ })
+ state.closeFd(filePathPart)
+ continue
+ }
}
if hasherAndOffset != nil {
delete(state.fileHashers, filePath)
if hasherAndOffset.mth.PreaddSize() == 0 {
- if bytes.Compare(hasherAndOffset.mth.Sum(nil), file.Hash[:]) != 0 {
+ if !bytes.Equal(hasherAndOffset.mth.Sum(nil), file.Hash[:]) {
state.Ctx.LogE(
"sp-file-bad-checksum", lesp,
errors.New("checksum mismatch"),