X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Fjobs.go;h=05a7c9c66c91cee809e3cafc9fe8a0caea056a41;hb=0367cce2741e1ce6a89a49fd5c4e9df6005c9744;hp=16c95bfbbe2b7276499c912e0fb28a888c21c19c;hpb=271870ad4f56253e0918f673b90615e4749cf201;p=nncp.git diff --git a/src/jobs.go b/src/jobs.go index 16c95bf..05a7c9c 100644 --- a/src/jobs.go +++ b/src/jobs.go @@ -1,6 +1,6 @@ /* NNCP -- Node to Node copy, utilities for store-and-forward data exchange -Copyright (C) 2016-2020 Sergey Matveev +Copyright (C) 2016-2022 Sergey Matveev This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -18,11 +18,15 @@ along with this program. If not, see . package nncp import ( + "bytes" + "fmt" "io" "os" "path/filepath" + "strings" xdr "github.com/davecgh/go-xdr/xdr2" + "github.com/dustin/go-humanize" ) type TRxTx string @@ -30,16 +34,72 @@ type TRxTx string const ( TRx TRxTx = "rx" TTx TRxTx = "tx" + + HdrDir = "hdr" ) type Job struct { PktEnc *PktEnc - Fd *os.File + Path string Size int64 - HshValue *[32]byte + HshValue *[MTHSize]byte } -func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job { +func JobPath2Hdr(jobPath string) string { + return filepath.Join(filepath.Dir(jobPath), HdrDir, filepath.Base(jobPath)) +} + +func (ctx *Ctx) HdrRead(r io.Reader) (*PktEnc, []byte, error) { + var pktEnc PktEnc + _, err := xdr.Unmarshal(r, &pktEnc) + if err != nil { + return nil, nil, err + } + var raw bytes.Buffer + if _, err = xdr.Marshal(&raw, pktEnc); err != nil { + panic(err) + } + return &pktEnc, raw.Bytes(), nil +} + +func (ctx *Ctx) HdrWrite(pktEncRaw []byte, tgt string) error { + tmpHdr, err := ctx.NewTmpFile() + if err != nil { + ctx.LogE("hdr-write-tmp-new", nil, err, func(les LEs) string { + return "Header writing: new temporary file" + }) + return err + } + if _, err = tmpHdr.Write(pktEncRaw); err != nil { + ctx.LogE("hdr-write-write", nil, err, func(les LEs) string { + return "Header writing: writing" + }) + os.Remove(tmpHdr.Name()) + return err + } + if err = tmpHdr.Close(); err != nil { + ctx.LogE("hdr-write-close", nil, err, func(les LEs) string { + return "Header writing: closing" + }) + os.Remove(tmpHdr.Name()) + return err + } + if err = ensureDir(filepath.Dir(tgt), HdrDir); err != nil { + ctx.LogE("hdr-write-ensure-mkdir", nil, err, func(les LEs) string { + return "Header writing: ensuring directory" + }) + return err + } + if err = os.Rename(tmpHdr.Name(), JobPath2Hdr(tgt)); err != nil { + ctx.LogE("hdr-write-rename", nil, err, func(les LEs) string { + return "Header writing: renaming" + }) + return err + } + return err +} + +func (ctx *Ctx) jobsFind(nodeId *NodeId, xx TRxTx, nock, part bool) chan Job { rxPath := filepath.Join(ctx.Spool, nodeId.String(), string(xx)) jobs := make(chan Job, 16) go func() { @@ -54,32 +114,114 @@ func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job { return } for _, fi := range fis { - hshValue, err := FromBase32(fi.Name()) + name := fi.Name() + var hshValue []byte + if nock { + if !strings.HasSuffix(name, NoCKSuffix) || + len(name) != Base32Encoded32Len+len(NoCKSuffix) { + continue + } + hshValue, err = Base32Codec.DecodeString( + strings.TrimSuffix(name, NoCKSuffix), + ) + } else if part { + if !strings.HasSuffix(name, PartSuffix) || + len(name) != Base32Encoded32Len+len(PartSuffix) { + continue + } + hshValue, err = Base32Codec.DecodeString( + strings.TrimSuffix(name, PartSuffix), + ) + } else { + if len(name) != Base32Encoded32Len { + continue + } + hshValue, err = Base32Codec.DecodeString(name) + } if err != nil { continue } - fd, err := os.Open(filepath.Join(rxPath, fi.Name())) + pth := filepath.Join(rxPath, name) + hdrExists := true + var fd *os.File + if nock || part { + fd, err = os.Open(pth) + } else { + fd, err = os.Open(JobPath2Hdr(pth)) + if err != nil && os.IsNotExist(err) { + hdrExists = false + fd, err = os.Open(pth) + } + } if err != nil { continue } - var pktEnc PktEnc - if _, err = xdr.Unmarshal(fd, &pktEnc); err != nil || pktEnc.Magic != MagicNNCPEv4 { - fd.Close() + if part { + job := Job{ + Path: pth, + Size: fi.Size(), + HshValue: new([MTHSize]byte), + } + copy(job.HshValue[:], hshValue) + jobs <- job + continue + } + pktEnc, pktEncRaw, err := ctx.HdrRead(fd) + fd.Close() + if err != nil { continue } - fd.Seek(0, io.SeekStart) - ctx.LogD("jobs", SDS{ - "xx": string(xx), - "node": pktEnc.Sender, - "name": fi.Name(), - "nice": int(pktEnc.Nice), - "size": fi.Size(), - }, "taken") + switch pktEnc.Magic { + case MagicNNCPEv1.B: + err = MagicNNCPEv1.TooOld() + case MagicNNCPEv2.B: + err = MagicNNCPEv2.TooOld() + case MagicNNCPEv3.B: + err = MagicNNCPEv3.TooOld() + case MagicNNCPEv4.B: + err = MagicNNCPEv4.TooOld() + case MagicNNCPEv5.B: + err = MagicNNCPEv5.TooOld() + case MagicNNCPEv6.B: + default: + err = BadMagic + } + if err != nil { + ctx.LogE("job", LEs{ + {"XX", string(xx)}, + {"Name", name}, + {"Size", fi.Size()}, + }, err, func(les LEs) string { + return fmt.Sprintf( + "Job %s/%s size: %s", + string(xx), name, + humanize.IBytes(uint64(fi.Size())), + ) + }) + continue + } + ctx.LogD("job", LEs{ + {"XX", string(xx)}, + {"Node", pktEnc.Sender}, + {"Name", name}, + {"Nice", int(pktEnc.Nice)}, + {"Size", fi.Size()}, + }, func(les LEs) string { + return fmt.Sprintf( + "Job %s/%s/%s nice: %s size: %s", + pktEnc.Sender, string(xx), name, + NicenessFmt(pktEnc.Nice), + humanize.IBytes(uint64(fi.Size())), + ) + }) + if !hdrExists && ctx.HdrUsage { + ctx.HdrWrite(pktEncRaw, pth) + } job := Job{ - PktEnc: &pktEnc, - Fd: fd, + PktEnc: pktEnc, + Path: pth, Size: fi.Size(), - HshValue: new([32]byte), + HshValue: new([MTHSize]byte), } copy(job.HshValue[:], hshValue) jobs <- job @@ -87,3 +229,15 @@ func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job { }() return jobs } + +func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job { + return ctx.jobsFind(nodeId, xx, false, false) +} + +func (ctx *Ctx) JobsNoCK(nodeId *NodeId) chan Job { + return ctx.jobsFind(nodeId, TRx, true, false) +} + +func (ctx *Ctx) JobsPart(nodeId *NodeId) chan Job { + return ctx.jobsFind(nodeId, TRx, false, true) +}