/*
NNCP -- Node to Node copy, utilities for store-and-forward data exchange
-Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
+Copyright (C) 2016-2022 Sergey Matveev <stargrave@stargrave.org>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
"errors"
"fmt"
"io"
+ "log"
"os"
"path/filepath"
"sort"
+ "strconv"
"sync"
"time"
MaxSPSize = 1<<16 - 256
PartSuffix = ".part"
SPHeadOverhead = 4
+ CfgDeadline = "NNCPDEADLINE"
)
type MTHAndOffset struct {
}
func init() {
+ if v := os.Getenv(CfgDeadline); v != "" {
+ i, err := strconv.Atoi(v)
+ if err != nil {
+ log.Fatalln("Can not convert", CfgDeadline, "to integer:", err)
+ }
+ DefaultDeadline = time.Duration(i) * time.Second
+ }
+
var buf bytes.Buffer
spHead := SPHead{Type: SPTypeHalt}
if _, err := xdr.Marshal(&buf, spHead); err != nil {
NicenessFmt(state.Nice),
)
})
- conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
+ conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
if err = state.WriteSP(conn, buf, false); err != nil {
state.Ctx.LogE("sp-startI", les, err, func(les LEs) string {
return fmt.Sprintf(
NicenessFmt(state.Nice),
)
})
- conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
+ conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
if buf, err = state.ReadSP(conn); err != nil {
state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
return fmt.Sprintf(
}
les := LEs{{"Nice", int(state.Nice)}}
state.Ctx.LogD("sp-startR", les, logMsg)
- conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
+ conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
if buf, err = state.ReadSP(conn); err != nil {
state.Ctx.LogE("sp-startR-read", les, err, logMsg)
return err
var node *Node
for _, n := range state.Ctx.Neigh {
+ if n.NoisePub == nil {
+ continue
+ }
if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
node = n
break
state.dirUnlock()
return err
}
- conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
+ conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
if err = state.WriteSP(conn, buf, false); err != nil {
state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string {
return fmt.Sprintf(
break
Deadlined:
state.SetDead()
- conn.Close() // #nosec G104
+ conn.Close()
case now := <-pingTicker.C:
if now.After(state.TxLastSeen.Add(PingTimeout)) {
state.wg.Add(1)
if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
state.wg.Add(1)
go func() {
- ticker := time.NewTicker(time.Second)
+ dw, err := state.Ctx.NewDirWatcher(
+ filepath.Join(state.Ctx.Spool, state.Node.Id.String(), string(TTx)),
+ time.Second,
+ )
+ if err != nil {
+ state.Ctx.LogE("sp-queue-dir-watch", les, err, logMsg)
+ log.Fatalln(err)
+ }
for {
select {
case <-state.isDead:
+ dw.Close()
state.wg.Done()
- ticker.Stop()
return
- case <-ticker.C:
+ case <-dw.C:
for _, payload := range state.Ctx.infosOur(
state.Node.Id,
state.Nice,
fdAndFullSize, exists := state.fds[pth]
state.fdsLock.RUnlock()
if !exists {
+ state.Ctx.LogD("sp-queue-open", lesp, func(les LEs) string {
+ return logMsg(les) + ": opening"
+ })
fd, err := os.Open(pth)
if err != nil {
state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string {
}
fd := fdAndFullSize.fd
fullSize := fdAndFullSize.fullSize
+ lesp = append(lesp, LE{"FullSize", fullSize})
var bufRead []byte
if freq.Offset < uint64(fullSize) {
state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string {
LE{"XX", string(TTx)},
LE{"Pkt", pktName},
LE{"Size", int64(n)},
+ LE{"FullSize", fullSize},
)
state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string {
return fmt.Sprintf(
logMsg(les), humanize.IBytes(uint64(n)),
)
})
+ } else {
+ state.closeFd(pth)
}
- state.closeFd(pth)
payload = MarshalSP(SPTypeFile, SPFile{
Hash: freq.Hash,
Offset: freq.Offset,
state.progressBars[pktName] = struct{}{}
Progress("Tx", lesp)
}
+ if ourSize == uint64(fullSize) {
+ state.closeFd(pth)
+ state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string {
+ return logMsg(les) + ": finished"
+ })
+ if state.Ctx.ShowPrgrs {
+ delete(state.progressBars, pktName)
+ }
+ }
state.Lock()
for i, q := range state.queueTheir {
if *q.freq.Hash != *freq.Hash {
continue
}
if ourSize == uint64(fullSize) {
- state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string {
- return logMsg(les) + ": finished"
- })
state.queueTheir = append(
state.queueTheir[:i],
state.queueTheir[i+1:]...,
)
- if state.Ctx.ShowPrgrs {
- delete(state.progressBars, pktName)
- }
} else {
q.freq.Offset = ourSize
}
)
}
state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg)
- conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
+ conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
ct, err := state.csOur.Encrypt(nil, nil, payload)
if err != nil {
state.Ctx.LogE("sp-encrypting", les, err, logMsg)
)
}
state.Ctx.LogD("sp-recv-wait", les, logMsg)
- conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
+ conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
payload, err := state.ReadSP(conn)
if err != nil {
if err == io.EOF {
state.SetDead()
state.wg.Done()
state.SetDead()
- conn.Close() // #nosec G104
+ conn.Close()
}()
return nil
}
continue
}
- if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
+ if _, err = os.Stat(filepath.Join(
+ state.Ctx.Spool, state.Node.Id.String(), string(TRx),
+ SeenDir, Base32Codec.EncodeToString(info.Hash[:]),
+ )); err == nil {
state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string {
return logMsg(les) + ": already seen"
})
}
fullsize := int64(0)
state.RLock()
- infoTheir, ok := state.infosTheir[*file.Hash]
+ infoTheir := state.infosTheir[*file.Hash]
state.RUnlock()
- if !ok {
+ if infoTheir == nil {
state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
return logMsg(les) + ": unknown file"
})
}
if hasherAndOffset != nil {
delete(state.fileHashers, filePath)
- if hasherAndOffset.mth.PrependSize() == 0 {
+ if hasherAndOffset.mth.PreaddSize() == 0 {
if bytes.Compare(hasherAndOffset.mth.Sum(nil), file.Hash[:]) != 0 {
state.Ctx.LogE(
"sp-file-bad-checksum", lesp,
return fmt.Sprintf("Packet %s is sent", pktName)
})
if state.Ctx.HdrUsage {
- os.Remove(pth + HdrSuffix)
+ os.Remove(JobPath2Hdr(pth))
}
} else {
state.Ctx.LogE("sp-done", lesp, err, logMsg)