X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Fjobs.go;h=ae18b74cfdaf3fad39883d85a9b938825dc1f8d0;hb=b47dbfe6687569650fa544a4ecf3e4ea388390cb;hp=1b5cef8fbf24c5e1824cbff0c473cd478cf69bb2;hpb=5a9bf58a2638e42f2d42fa4d43c363a664fe8198;p=nncp.git diff --git a/src/jobs.go b/src/jobs.go index 1b5cef8..ae18b74 100644 --- a/src/jobs.go +++ b/src/jobs.go @@ -20,6 +20,7 @@ package nncp import ( "bytes" "fmt" + "io" "os" "path/filepath" "strings" @@ -34,19 +35,23 @@ const ( TRx TRxTx = "rx" TTx TRxTx = "tx" - HdrSuffix = ".hdr" + HdrDir = "hdr" ) type Job struct { PktEnc *PktEnc Path string Size int64 - HshValue *[32]byte + HshValue *[MTHSize]byte } -func (ctx *Ctx) HdrRead(fd *os.File) (*PktEnc, []byte, error) { +func JobPath2Hdr(jobPath string) string { + return filepath.Join(filepath.Dir(jobPath), HdrDir, filepath.Base(jobPath)) +} + +func (ctx *Ctx) HdrRead(r io.Reader) (*PktEnc, []byte, error) { var pktEnc PktEnc - _, err := xdr.Unmarshal(fd, &pktEnc) + _, err := xdr.Unmarshal(r, &pktEnc) if err != nil { return nil, nil, err } @@ -79,7 +84,13 @@ func (ctx *Ctx) HdrWrite(pktEncRaw []byte, tgt string) error { os.Remove(tmpHdr.Name()) return err } - if err = os.Rename(tmpHdr.Name(), tgt+HdrSuffix); err != nil { + if err = ensureDir(filepath.Dir(tgt), HdrDir); err != nil { + ctx.LogE("hdr-write-ensure-mkdir", nil, err, func(les LEs) string { + return "Header writing: ensuring directory" + }) + return err + } + if err = os.Rename(tmpHdr.Name(), JobPath2Hdr(tgt)); err != nil { ctx.LogE("hdr-write-rename", nil, err, func(les LEs) string { return "Header writing: renaming" }) @@ -88,7 +99,7 @@ func (ctx *Ctx) HdrWrite(pktEncRaw []byte, tgt string) error { return err } -func (ctx *Ctx) jobsFind(nodeId *NodeId, xx TRxTx, nock bool) chan Job { +func (ctx *Ctx) jobsFind(nodeId *NodeId, xx TRxTx, nock, part bool) chan Job { rxPath := filepath.Join(ctx.Spool, nodeId.String(), string(xx)) jobs := make(chan Job, 16) go func() { @@ -98,7 +109,7 @@ func (ctx *Ctx) jobsFind(nodeId *NodeId, xx TRxTx, nock bool) chan Job { return } fis, err := dir.Readdir(0) - dir.Close() // #nosec G104 + dir.Close() if err != nil { return } @@ -113,6 +124,14 @@ func (ctx *Ctx) jobsFind(nodeId *NodeId, xx TRxTx, nock bool) chan Job { hshValue, err = Base32Codec.DecodeString( strings.TrimSuffix(name, NoCKSuffix), ) + } else if part { + if !strings.HasSuffix(name, PartSuffix) || + len(name) != Base32Encoded32Len+len(PartSuffix) { + continue + } + hshValue, err = Base32Codec.DecodeString( + strings.TrimSuffix(name, PartSuffix), + ) } else { if len(name) != Base32Encoded32Len { continue @@ -125,10 +144,10 @@ func (ctx *Ctx) jobsFind(nodeId *NodeId, xx TRxTx, nock bool) chan Job { pth := filepath.Join(rxPath, name) hdrExists := true var fd *os.File - if nock { + if nock || part { fd, err = os.Open(pth) } else { - fd, err = os.Open(pth + HdrSuffix) + fd, err = os.Open(JobPath2Hdr(pth)) if err != nil && os.IsNotExist(err) { hdrExists = false fd, err = os.Open(pth) @@ -137,9 +156,48 @@ func (ctx *Ctx) jobsFind(nodeId *NodeId, xx TRxTx, nock bool) chan Job { if err != nil { continue } + if part { + job := Job{ + Path: pth, + Size: fi.Size(), + HshValue: new([MTHSize]byte), + } + copy(job.HshValue[:], hshValue) + jobs <- job + continue + } pktEnc, pktEncRaw, err := ctx.HdrRead(fd) fd.Close() - if err != nil || pktEnc.Magic != MagicNNCPEv4 { + if err != nil { + continue + } + switch pktEnc.Magic { + case MagicNNCPEv1.B: + err = MagicNNCPEv1.TooOld() + case MagicNNCPEv2.B: + err = MagicNNCPEv2.TooOld() + case MagicNNCPEv3.B: + err = MagicNNCPEv3.TooOld() + case MagicNNCPEv4.B: + err = MagicNNCPEv4.TooOld() + case MagicNNCPEv5.B: + err = MagicNNCPEv5.TooOld() + case MagicNNCPEv6.B: + default: + err = BadMagic + } + if err != nil { + ctx.LogE("job", LEs{ + {"XX", string(xx)}, + {"Name", name}, + {"Size", fi.Size()}, + }, err, func(les LEs) string { + return fmt.Sprintf( + "Job %s/%s size: %s", + string(xx), name, + humanize.IBytes(uint64(fi.Size())), + ) + }) continue } ctx.LogD("job", LEs{ @@ -163,7 +221,7 @@ func (ctx *Ctx) jobsFind(nodeId *NodeId, xx TRxTx, nock bool) chan Job { PktEnc: pktEnc, Path: pth, Size: fi.Size(), - HshValue: new([32]byte), + HshValue: new([MTHSize]byte), } copy(job.HshValue[:], hshValue) jobs <- job @@ -173,9 +231,13 @@ func (ctx *Ctx) jobsFind(nodeId *NodeId, xx TRxTx, nock bool) chan Job { } func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job { - return ctx.jobsFind(nodeId, xx, false) + return ctx.jobsFind(nodeId, xx, false, false) } func (ctx *Ctx) JobsNoCK(nodeId *NodeId) chan Job { - return ctx.jobsFind(nodeId, TRx, true) + return ctx.jobsFind(nodeId, TRx, true, false) +} + +func (ctx *Ctx) JobsPart(nodeId *NodeId) chan Job { + return ctx.jobsFind(nodeId, TRx, false, true) }