package nncp
import (
- "io"
+ "bytes"
+ "fmt"
"os"
"path/filepath"
+ "strings"
xdr "github.com/davecgh/go-xdr/xdr2"
+ "github.com/dustin/go-humanize"
)
type TRxTx string
const (
TRx TRxTx = "rx"
TTx TRxTx = "tx"
+
+ HdrSuffix = ".hdr"
)
type Job struct {
PktEnc *PktEnc
- Fd *os.File
+ Path string
Size int64
- HshValue *[32]byte
+ HshValue *[MTHSize]byte
}
-func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job {
+func (ctx *Ctx) HdrRead(fd *os.File) (*PktEnc, []byte, error) {
+ var pktEnc PktEnc
+ _, err := xdr.Unmarshal(fd, &pktEnc)
+ if err != nil {
+ return nil, nil, err
+ }
+ var raw bytes.Buffer
+ if _, err = xdr.Marshal(&raw, pktEnc); err != nil {
+ panic(err)
+ }
+ return &pktEnc, raw.Bytes(), nil
+}
+
+func (ctx *Ctx) HdrWrite(pktEncRaw []byte, tgt string) error {
+ tmpHdr, err := ctx.NewTmpFile()
+ if err != nil {
+ ctx.LogE("hdr-write-tmp-new", nil, err, func(les LEs) string {
+ return "Header writing: new temporary file"
+ })
+ return err
+ }
+ if _, err = tmpHdr.Write(pktEncRaw); err != nil {
+ ctx.LogE("hdr-write-write", nil, err, func(les LEs) string {
+ return "Header writing: writing"
+ })
+ os.Remove(tmpHdr.Name())
+ return err
+ }
+ if err = tmpHdr.Close(); err != nil {
+ ctx.LogE("hdr-write-close", nil, err, func(les LEs) string {
+ return "Header writing: closing"
+ })
+ os.Remove(tmpHdr.Name())
+ return err
+ }
+ if err = os.Rename(tmpHdr.Name(), tgt+HdrSuffix); err != nil {
+ ctx.LogE("hdr-write-rename", nil, err, func(les LEs) string {
+ return "Header writing: renaming"
+ })
+ return err
+ }
+ return err
+}
+
+func (ctx *Ctx) jobsFind(nodeId *NodeId, xx TRxTx, nock bool) chan Job {
rxPath := filepath.Join(ctx.Spool, nodeId.String(), string(xx))
jobs := make(chan Job, 16)
go func() {
return
}
for _, fi := range fis {
- hshValue, err := Base32Codec.DecodeString(fi.Name())
+ name := fi.Name()
+ var hshValue []byte
+ if nock {
+ if !strings.HasSuffix(name, NoCKSuffix) ||
+ len(name) != Base32Encoded32Len+len(NoCKSuffix) {
+ continue
+ }
+ hshValue, err = Base32Codec.DecodeString(
+ strings.TrimSuffix(name, NoCKSuffix),
+ )
+ } else {
+ if len(name) != Base32Encoded32Len {
+ continue
+ }
+ hshValue, err = Base32Codec.DecodeString(name)
+ }
if err != nil {
continue
}
- fd, err := os.Open(filepath.Join(rxPath, fi.Name()))
+ pth := filepath.Join(rxPath, name)
+ hdrExists := true
+ var fd *os.File
+ if nock {
+ fd, err = os.Open(pth)
+ } else {
+ fd, err = os.Open(pth + HdrSuffix)
+ if err != nil && os.IsNotExist(err) {
+ hdrExists = false
+ fd, err = os.Open(pth)
+ }
+ }
if err != nil {
continue
}
- var pktEnc PktEnc
- if _, err = xdr.Unmarshal(fd, &pktEnc); err != nil || pktEnc.Magic != MagicNNCPEv4 {
- fd.Close() // #nosec G104
+ pktEnc, pktEncRaw, err := ctx.HdrRead(fd)
+ fd.Close()
+ if err != nil || pktEnc.Magic != MagicNNCPEv5.B {
continue
}
- if _, err = fd.Seek(0, io.SeekStart); err != nil {
- fd.Close() // #nosec G104
- continue
+ ctx.LogD("job", LEs{
+ {"XX", string(xx)},
+ {"Node", pktEnc.Sender},
+ {"Name", name},
+ {"Nice", int(pktEnc.Nice)},
+ {"Size", fi.Size()},
+ }, func(les LEs) string {
+ return fmt.Sprintf(
+ "Job %s/%s/%s nice: %s size: %s",
+ pktEnc.Sender, string(xx), name,
+ NicenessFmt(pktEnc.Nice),
+ humanize.IBytes(uint64(fi.Size())),
+ )
+ })
+ if !hdrExists && ctx.HdrUsage {
+ ctx.HdrWrite(pktEncRaw, pth)
}
- ctx.LogD("jobs", SDS{
- "xx": string(xx),
- "node": pktEnc.Sender,
- "name": fi.Name(),
- "nice": int(pktEnc.Nice),
- "size": fi.Size(),
- }, "taken")
job := Job{
- PktEnc: &pktEnc,
- Fd: fd,
+ PktEnc: pktEnc,
+ Path: pth,
Size: fi.Size(),
- HshValue: new([32]byte),
+ HshValue: new([MTHSize]byte),
}
copy(job.HshValue[:], hshValue)
jobs <- job
}()
return jobs
}
+
+func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job {
+ return ctx.jobsFind(nodeId, xx, false)
+}
+
+func (ctx *Ctx) JobsNoCK(nodeId *NodeId) chan Job {
+ return ctx.jobsFind(nodeId, TRx, true)
+}