X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Fjobs.go;h=ae18b74cfdaf3fad39883d85a9b938825dc1f8d0;hb=b47dbfe6687569650fa544a4ecf3e4ea388390cb;hp=8fe45642d4e8e33998fb89950163973c3ced5f20;hpb=ae2b0d687a4ed40fe0161b79b4a9ec42290ffb6b;p=nncp.git diff --git a/src/jobs.go b/src/jobs.go index 8fe4564..ae18b74 100644 --- a/src/jobs.go +++ b/src/jobs.go @@ -18,11 +18,15 @@ along with this program. If not, see . package nncp import ( + "bytes" + "fmt" "io" "os" "path/filepath" + "strings" xdr "github.com/davecgh/go-xdr/xdr2" + "github.com/dustin/go-humanize" ) type TRxTx string @@ -30,16 +34,72 @@ type TRxTx string const ( TRx TRxTx = "rx" TTx TRxTx = "tx" + + HdrDir = "hdr" ) type Job struct { PktEnc *PktEnc - Fd *os.File + Path string Size int64 - HshValue *[32]byte + HshValue *[MTHSize]byte } -func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job { +func JobPath2Hdr(jobPath string) string { + return filepath.Join(filepath.Dir(jobPath), HdrDir, filepath.Base(jobPath)) +} + +func (ctx *Ctx) HdrRead(r io.Reader) (*PktEnc, []byte, error) { + var pktEnc PktEnc + _, err := xdr.Unmarshal(r, &pktEnc) + if err != nil { + return nil, nil, err + } + var raw bytes.Buffer + if _, err = xdr.Marshal(&raw, pktEnc); err != nil { + panic(err) + } + return &pktEnc, raw.Bytes(), nil +} + +func (ctx *Ctx) HdrWrite(pktEncRaw []byte, tgt string) error { + tmpHdr, err := ctx.NewTmpFile() + if err != nil { + ctx.LogE("hdr-write-tmp-new", nil, err, func(les LEs) string { + return "Header writing: new temporary file" + }) + return err + } + if _, err = tmpHdr.Write(pktEncRaw); err != nil { + ctx.LogE("hdr-write-write", nil, err, func(les LEs) string { + return "Header writing: writing" + }) + os.Remove(tmpHdr.Name()) + return err + } + if err = tmpHdr.Close(); err != nil { + ctx.LogE("hdr-write-close", nil, err, func(les LEs) string { + return "Header writing: closing" + }) + os.Remove(tmpHdr.Name()) + return err + } + if err = ensureDir(filepath.Dir(tgt), HdrDir); err != nil { + ctx.LogE("hdr-write-ensure-mkdir", nil, err, func(les LEs) string { + return "Header writing: ensuring directory" + }) + return err + } + if err = os.Rename(tmpHdr.Name(), JobPath2Hdr(tgt)); err != nil { + ctx.LogE("hdr-write-rename", nil, err, func(les LEs) string { + return "Header writing: renaming" + }) + return err + } + return err +} + +func (ctx *Ctx) jobsFind(nodeId *NodeId, xx TRxTx, nock, part bool) chan Job { rxPath := filepath.Join(ctx.Spool, nodeId.String(), string(xx)) jobs := make(chan Job, 16) go func() { @@ -49,40 +109,119 @@ func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job { return } fis, err := dir.Readdir(0) - dir.Close() // #nosec G104 + dir.Close() if err != nil { return } for _, fi := range fis { - hshValue, err := Base32Codec.DecodeString(fi.Name()) + name := fi.Name() + var hshValue []byte + if nock { + if !strings.HasSuffix(name, NoCKSuffix) || + len(name) != Base32Encoded32Len+len(NoCKSuffix) { + continue + } + hshValue, err = Base32Codec.DecodeString( + strings.TrimSuffix(name, NoCKSuffix), + ) + } else if part { + if !strings.HasSuffix(name, PartSuffix) || + len(name) != Base32Encoded32Len+len(PartSuffix) { + continue + } + hshValue, err = Base32Codec.DecodeString( + strings.TrimSuffix(name, PartSuffix), + ) + } else { + if len(name) != Base32Encoded32Len { + continue + } + hshValue, err = Base32Codec.DecodeString(name) + } if err != nil { continue } - fd, err := os.Open(filepath.Join(rxPath, fi.Name())) + pth := filepath.Join(rxPath, name) + hdrExists := true + var fd *os.File + if nock || part { + fd, err = os.Open(pth) + } else { + fd, err = os.Open(JobPath2Hdr(pth)) + if err != nil && os.IsNotExist(err) { + hdrExists = false + fd, err = os.Open(pth) + } + } if err != nil { continue } - var pktEnc PktEnc - if _, err = xdr.Unmarshal(fd, &pktEnc); err != nil || pktEnc.Magic != MagicNNCPEv4 { - fd.Close() // #nosec G104 + if part { + job := Job{ + Path: pth, + Size: fi.Size(), + HshValue: new([MTHSize]byte), + } + copy(job.HshValue[:], hshValue) + jobs <- job continue } - if _, err = fd.Seek(0, io.SeekStart); err != nil { - fd.Close() // #nosec G104 + pktEnc, pktEncRaw, err := ctx.HdrRead(fd) + fd.Close() + if err != nil { continue } - ctx.LogD("jobs", LEs{ + switch pktEnc.Magic { + case MagicNNCPEv1.B: + err = MagicNNCPEv1.TooOld() + case MagicNNCPEv2.B: + err = MagicNNCPEv2.TooOld() + case MagicNNCPEv3.B: + err = MagicNNCPEv3.TooOld() + case MagicNNCPEv4.B: + err = MagicNNCPEv4.TooOld() + case MagicNNCPEv5.B: + err = MagicNNCPEv5.TooOld() + case MagicNNCPEv6.B: + default: + err = BadMagic + } + if err != nil { + ctx.LogE("job", LEs{ + {"XX", string(xx)}, + {"Name", name}, + {"Size", fi.Size()}, + }, err, func(les LEs) string { + return fmt.Sprintf( + "Job %s/%s size: %s", + string(xx), name, + humanize.IBytes(uint64(fi.Size())), + ) + }) + continue + } + ctx.LogD("job", LEs{ {"XX", string(xx)}, {"Node", pktEnc.Sender}, - {"Name", fi.Name()}, + {"Name", name}, {"Nice", int(pktEnc.Nice)}, {"Size", fi.Size()}, - }, "taken") + }, func(les LEs) string { + return fmt.Sprintf( + "Job %s/%s/%s nice: %s size: %s", + pktEnc.Sender, string(xx), name, + NicenessFmt(pktEnc.Nice), + humanize.IBytes(uint64(fi.Size())), + ) + }) + if !hdrExists && ctx.HdrUsage { + ctx.HdrWrite(pktEncRaw, pth) + } job := Job{ - PktEnc: &pktEnc, - Fd: fd, + PktEnc: pktEnc, + Path: pth, Size: fi.Size(), - HshValue: new([32]byte), + HshValue: new([MTHSize]byte), } copy(job.HshValue[:], hshValue) jobs <- job @@ -90,3 +229,15 @@ func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job { }() return jobs } + +func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job { + return ctx.jobsFind(nodeId, xx, false, false) +} + +func (ctx *Ctx) JobsNoCK(nodeId *NodeId) chan Job { + return ctx.jobsFind(nodeId, TRx, true, false) +} + +func (ctx *Ctx) JobsPart(nodeId *NodeId) chan Job { + return ctx.jobsFind(nodeId, TRx, false, true) +}