/*
NNCP -- Node to Node copy, utilities for store-and-forward data exchange
-Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
+Copyright (C) 2016-2022 Sergey Matveev <stargrave@stargrave.org>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
import (
"bytes"
"fmt"
+ "io"
"os"
"path/filepath"
"strings"
TRx TRxTx = "rx"
TTx TRxTx = "tx"
- HdrSuffix = ".hdr"
+ HdrDir = "hdr"
)
type Job struct {
HshValue *[MTHSize]byte
}
-func (ctx *Ctx) HdrRead(fd *os.File) (*PktEnc, []byte, error) {
+func JobPath2Hdr(jobPath string) string {
+ return filepath.Join(filepath.Dir(jobPath), HdrDir, filepath.Base(jobPath))
+}
+
+func (ctx *Ctx) HdrRead(r io.Reader) (*PktEnc, []byte, error) {
var pktEnc PktEnc
- _, err := xdr.Unmarshal(fd, &pktEnc)
+ _, err := xdr.Unmarshal(r, &pktEnc)
if err != nil {
return nil, nil, err
}
os.Remove(tmpHdr.Name())
return err
}
- if err = os.Rename(tmpHdr.Name(), tgt+HdrSuffix); err != nil {
+ if err = ensureDir(filepath.Dir(tgt), HdrDir); err != nil {
+ ctx.LogE("hdr-write-ensure-mkdir", nil, err, func(les LEs) string {
+ return "Header writing: ensuring directory"
+ })
+ return err
+ }
+ if err = os.Rename(tmpHdr.Name(), JobPath2Hdr(tgt)); err != nil {
ctx.LogE("hdr-write-rename", nil, err, func(les LEs) string {
return "Header writing: renaming"
})
return err
}
-func (ctx *Ctx) jobsFind(nodeId *NodeId, xx TRxTx, nock bool) chan Job {
+func (ctx *Ctx) jobsFind(nodeId *NodeId, xx TRxTx, nock, part bool) chan Job {
rxPath := filepath.Join(ctx.Spool, nodeId.String(), string(xx))
jobs := make(chan Job, 16)
go func() {
return
}
fis, err := dir.Readdir(0)
- dir.Close() // #nosec G104
+ dir.Close()
if err != nil {
return
}
hshValue, err = Base32Codec.DecodeString(
strings.TrimSuffix(name, NoCKSuffix),
)
+ } else if part {
+ if !strings.HasSuffix(name, PartSuffix) ||
+ len(name) != Base32Encoded32Len+len(PartSuffix) {
+ continue
+ }
+ hshValue, err = Base32Codec.DecodeString(
+ strings.TrimSuffix(name, PartSuffix),
+ )
} else {
if len(name) != Base32Encoded32Len {
continue
pth := filepath.Join(rxPath, name)
hdrExists := true
var fd *os.File
- if nock {
+ if nock || part {
fd, err = os.Open(pth)
} else {
- fd, err = os.Open(pth + HdrSuffix)
+ fd, err = os.Open(JobPath2Hdr(pth))
if err != nil && os.IsNotExist(err) {
hdrExists = false
fd, err = os.Open(pth)
if err != nil {
continue
}
+ if part {
+ job := Job{
+ Path: pth,
+ Size: fi.Size(),
+ HshValue: new([MTHSize]byte),
+ }
+ copy(job.HshValue[:], hshValue)
+ jobs <- job
+ continue
+ }
pktEnc, pktEncRaw, err := ctx.HdrRead(fd)
fd.Close()
- if err != nil || pktEnc.Magic != MagicNNCPEv5 {
+ if err != nil {
+ continue
+ }
+ switch pktEnc.Magic {
+ case MagicNNCPEv1.B:
+ err = MagicNNCPEv1.TooOld()
+ case MagicNNCPEv2.B:
+ err = MagicNNCPEv2.TooOld()
+ case MagicNNCPEv3.B:
+ err = MagicNNCPEv3.TooOld()
+ case MagicNNCPEv4.B:
+ err = MagicNNCPEv4.TooOld()
+ case MagicNNCPEv5.B:
+ err = MagicNNCPEv5.TooOld()
+ case MagicNNCPEv6.B:
+ default:
+ err = BadMagic
+ }
+ if err != nil {
+ ctx.LogE("job", LEs{
+ {"XX", string(xx)},
+ {"Name", name},
+ {"Size", fi.Size()},
+ }, err, func(les LEs) string {
+ return fmt.Sprintf(
+ "Job %s/%s size: %s",
+ string(xx), name,
+ humanize.IBytes(uint64(fi.Size())),
+ )
+ })
continue
}
ctx.LogD("job", LEs{
}
func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job {
- return ctx.jobsFind(nodeId, xx, false)
+ return ctx.jobsFind(nodeId, xx, false, false)
}
func (ctx *Ctx) JobsNoCK(nodeId *NodeId) chan Job {
- return ctx.jobsFind(nodeId, TRx, true)
+ return ctx.jobsFind(nodeId, TRx, true, false)
+}
+
+func (ctx *Ctx) JobsPart(nodeId *NodeId) chan Job {
+ return ctx.jobsFind(nodeId, TRx, false, true)
}