]> Cypherpunks.ru repositories - nncp.git/blobdiff - src/sp.go
.hdr files
[nncp.git] / src / sp.go
index b5bad2d5dcd21a16b909c44a8ab29addf6d8a3a2..e1207693cd9c985d13c9d67e9d250e15a5691304 100644 (file)
--- a/src/sp.go
+++ b/src/sp.go
@@ -21,6 +21,7 @@ import (
        "bytes"
        "crypto/subtle"
        "errors"
+       "hash"
        "io"
        "os"
        "path/filepath"
@@ -30,6 +31,7 @@ import (
 
        xdr "github.com/davecgh/go-xdr/xdr2"
        "github.com/flynn/noise"
+       "golang.org/x/crypto/blake2b"
 )
 
 const (
@@ -55,10 +57,18 @@ var (
 
        DefaultDeadline = 10 * time.Second
        PingTimeout     = time.Minute
-
-       spCheckerToken chan struct{}
 )
 
+type FdAndFullSize struct {
+       fd       *os.File
+       fullSize int64
+}
+
+type HasherAndOffset struct {
+       h      hash.Hash
+       offset uint64
+}
+
 type SPType uint8
 
 const (
@@ -148,8 +158,6 @@ func init() {
                panic(err)
        }
        SPFileOverhead = buf.Len()
-       spCheckerToken = make(chan struct{}, 1)
-       spCheckerToken <- struct{}{}
 }
 
 func MarshalSP(typ SPType, sp interface{}) []byte {
@@ -183,6 +191,7 @@ type SPState struct {
        Ctx            *Ctx
        Node           *Node
        Nice           uint8
+       NoCK           bool
        onlineDeadline time.Duration
        maxOnlineTime  time.Duration
        hs             *noise.HandshakeState
@@ -214,6 +223,9 @@ type SPState struct {
        listOnly       bool
        onlyPkts       map[[32]byte]bool
        writeSPBuf     bytes.Buffer
+       fds            map[string]FdAndFullSize
+       fileHashers    map[string]*HasherAndOffset
+       checkerJobs    chan *[32]byte
        sync.RWMutex
 }
 
@@ -235,6 +247,14 @@ func (state *SPState) SetDead() {
                for range state.pings {
                }
        }()
+       go func() {
+               for _, s := range state.fds {
+                       s.fd.Close()
+               }
+       }()
+       if !state.NoCK {
+               close(state.checkerJobs)
+       }
 }
 
 func (state *SPState) NotAlive() bool {
@@ -251,6 +271,31 @@ func (state *SPState) dirUnlock() {
        state.Ctx.UnlockDir(state.txLock)
 }
 
+func (state *SPState) SPChecker() {
+       for hshValue := range state.checkerJobs {
+               les := LEs{
+                       {"XX", string(TRx)},
+                       {"Node", state.Node.Id},
+                       {"Pkt", Base32Codec.EncodeToString(hshValue[:])},
+               }
+               state.Ctx.LogD("sp-file", les, "checking")
+               size, err := state.Ctx.CheckNoCK(state.Node.Id, hshValue)
+               les = append(les, LE{"Size", size})
+               if err != nil {
+                       state.Ctx.LogE("sp-file", les, err, "")
+                       continue
+               }
+               state.Ctx.LogI("sp-done", les, "")
+               state.wg.Add(1)
+               go func(hsh *[32]byte) {
+                       if !state.NotAlive() {
+                               state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh})
+                       }
+                       state.wg.Done()
+               }(hshValue)
+       }
+}
+
 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
        state.writeSPBuf.Reset()
        n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
@@ -444,6 +489,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error {
        state.infosTheir = make(map[[32]byte]*SPInfo)
        state.started = started
        state.xxOnly = xxOnly
+
        var buf []byte
        var payload []byte
        state.Ctx.LogD("sp-start", LEs{{"Nice", int(state.Nice)}}, "waiting for first message")
@@ -529,17 +575,49 @@ func (state *SPState) StartR(conn ConnDeadlined) error {
        return err
 }
 
+func (state *SPState) closeFd(pth string) {
+       s, exists := state.fds[pth]
+       delete(state.fds, pth)
+       if exists {
+               s.fd.Close()
+       }
+}
+
+func (state *SPState) FillExistingNoCK() {
+       checkerJobs := make([]*[32]byte, 0)
+       for job := range state.Ctx.JobsNoCK(state.Node.Id) {
+               if job.PktEnc.Nice > state.Nice {
+                       continue
+               }
+               checkerJobs = append(checkerJobs, job.HshValue)
+       }
+       for _, job := range checkerJobs {
+               state.checkerJobs <- job
+       }
+       state.wg.Done()
+}
+
 func (state *SPState) StartWorkers(
        conn ConnDeadlined,
        infosPayloads [][]byte,
        payload []byte,
 ) error {
        les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
+       state.fds = make(map[string]FdAndFullSize)
+       state.fileHashers = make(map[string]*HasherAndOffset)
        state.isDead = make(chan struct{})
        if state.maxOnlineTime > 0 {
                state.mustFinishAt = state.started.Add(state.maxOnlineTime)
        }
 
+       // Checker
+       if !state.NoCK {
+               state.checkerJobs = make(chan *[32]byte)
+               go state.SPChecker()
+               state.wg.Add(1)
+               go state.FillExistingNoCK()
+       }
+
        // Remaining handshake payload sending
        if len(infosPayloads) > 1 {
                state.wg.Add(1)
@@ -682,22 +760,29 @@ func (state *SPState) StartWorkers(
                                        {"Size", int64(freq.Offset)},
                                }...)
                                state.Ctx.LogD("sp-file", lesp, "queueing")
-                               fd, err := os.Open(filepath.Join(
+                               pth := filepath.Join(
                                        state.Ctx.Spool,
                                        state.Node.Id.String(),
                                        string(TTx),
                                        Base32Codec.EncodeToString(freq.Hash[:]),
-                               ))
-                               if err != nil {
-                                       state.Ctx.LogE("sp-file", lesp, err, "")
-                                       return
-                               }
-                               fi, err := fd.Stat()
-                               if err != nil {
-                                       state.Ctx.LogE("sp-file", lesp, err, "")
-                                       return
+                               )
+                               fdAndFullSize, exists := state.fds[pth]
+                               if !exists {
+                                       fd, err := os.Open(pth)
+                                       if err != nil {
+                                               state.Ctx.LogE("sp-file", lesp, err, "")
+                                               return
+                                       }
+                                       fi, err := fd.Stat()
+                                       if err != nil {
+                                               state.Ctx.LogE("sp-file", lesp, err, "")
+                                               return
+                                       }
+                                       fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
+                                       state.fds[pth] = fdAndFullSize
                                }
-                               fullSize := fi.Size()
+                               fd := fdAndFullSize.fd
+                               fullSize := fdAndFullSize.fullSize
                                var buf []byte
                                if freq.Offset < uint64(fullSize) {
                                        state.Ctx.LogD("sp-file", lesp, "seeking")
@@ -714,7 +799,7 @@ func (state *SPState) StartWorkers(
                                        buf = buf[:n]
                                        state.Ctx.LogD("sp-file", append(lesp, LE{"Size", n}), "read")
                                }
-                               fd.Close() // #nosec G104
+                               state.closeFd(pth)
                                payload = MarshalSP(SPTypeFile, SPFile{
                                        Hash:    freq.Hash,
                                        Offset:  freq.Offset,
@@ -861,8 +946,10 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        state.Lock()
                        state.queueTheir = nil
                        state.Unlock()
+
                case SPTypePing:
                        state.Ctx.LogD("sp-process", append(les, LE{"Type", "ping"}), "")
+
                case SPTypeInfo:
                        infosGot = true
                        lesp := append(les, LE{"Type", "info"})
@@ -909,6 +996,10 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                }
                                continue
                        }
+                       if _, err = os.Stat(pktPath + NoCKSuffix); err == nil {
+                               state.Ctx.LogI("sp-info", lesp, "still non checksummed")
+                               continue
+                       }
                        fi, err := os.Stat(pktPath + PartSuffix)
                        var offset int64
                        if err == nil {
@@ -925,6 +1016,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                        SPFreq{info.Hash, uint64(offset)},
                                ))
                        }
+
                case SPTypeFile:
                        lesp := append(les, LE{"Type", "file"})
                        state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
@@ -944,29 +1036,60 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                string(TRx),
                        )
                        filePath := filepath.Join(dirToSync, Base32Codec.EncodeToString(file.Hash[:]))
+                       filePathPart := filePath + PartSuffix
                        state.Ctx.LogD("sp-file", lesp, "opening part")
-                       fd, err := os.OpenFile(
-                               filePath+PartSuffix,
-                               os.O_RDWR|os.O_CREATE,
-                               os.FileMode(0666),
-                       )
-                       if err != nil {
-                               state.Ctx.LogE("sp-file", lesp, err, "")
-                               return nil, err
+                       fdAndFullSize, exists := state.fds[filePathPart]
+                       var fd *os.File
+                       if exists {
+                               fd = fdAndFullSize.fd
+                       } else {
+                               fd, err = os.OpenFile(
+                                       filePathPart,
+                                       os.O_RDWR|os.O_CREATE,
+                                       os.FileMode(0666),
+                               )
+                               if err != nil {
+                                       state.Ctx.LogE("sp-file", lesp, err, "")
+                                       return nil, err
+                               }
+                               state.fds[filePathPart] = FdAndFullSize{fd: fd}
+                               if file.Offset == 0 {
+                                       h, err := blake2b.New256(nil)
+                                       if err != nil {
+                                               panic(err)
+                                       }
+                                       state.fileHashers[filePath] = &HasherAndOffset{h: h}
+                               }
                        }
                        state.Ctx.LogD("sp-file", append(lesp, LE{"Offset", file.Offset}), "seeking")
                        if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
                                state.Ctx.LogE("sp-file", lesp, err, "")
-                               fd.Close() // #nosec G104
+                               state.closeFd(filePathPart)
                                return nil, err
                        }
                        state.Ctx.LogD("sp-file", lesp, "writing")
-                       _, err = fd.Write(file.Payload)
-                       if err != nil {
+                       if _, err = fd.Write(file.Payload); err != nil {
                                state.Ctx.LogE("sp-file", lesp, err, "")
-                               fd.Close() // #nosec G104
+                               state.closeFd(filePathPart)
                                return nil, err
                        }
+                       hasherAndOffset, hasherExists := state.fileHashers[filePath]
+                       if hasherExists {
+                               if hasherAndOffset.offset == file.Offset {
+                                       if _, err = hasherAndOffset.h.Write(file.Payload); err != nil {
+                                               panic(err)
+                                       }
+                                       hasherAndOffset.offset += uint64(len(file.Payload))
+                               } else {
+                                       state.Ctx.LogE(
+                                               "sp-file", lesp,
+                                               errors.New("offset differs"),
+                                               "deleting hasher",
+                                       )
+                                       delete(state.fileHashers, filePath)
+                                       hasherExists = false
+                               }
+                       }
                        ourSize := int64(file.Offset + uint64(len(file.Payload)))
                        lesp[len(lesp)-1].V = ourSize
                        fullsize := int64(0)
@@ -981,51 +1104,71 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                Progress("Rx", lesp)
                        }
                        if fullsize != ourSize {
-                               fd.Close() // #nosec G104
                                continue
                        }
-                       <-spCheckerToken
-                       go func() {
-                               defer func() {
-                                       spCheckerToken <- struct{}{}
-                               }()
-                               if err := fd.Sync(); err != nil {
-                                       state.Ctx.LogE("sp-file", lesp, err, "sync")
-                                       fd.Close() // #nosec G104
-                                       return
-                               }
-                               state.wg.Add(1)
-                               defer state.wg.Done()
-                               if _, err = fd.Seek(0, io.SeekStart); err != nil {
-                                       fd.Close() // #nosec G104
-                                       state.Ctx.LogE("sp-file", lesp, err, "")
-                                       return
-                               }
-                               state.Ctx.LogD("sp-file", lesp, "checking")
-                               gut, err := Check(fd, file.Hash[:], lesp, state.Ctx.ShowPrgrs)
-                               fd.Close() // #nosec G104
-                               if err != nil || !gut {
+                       err = fd.Sync()
+                       if err != nil {
+                               state.Ctx.LogE("sp-file", lesp, err, "sync")
+                               state.closeFd(filePathPart)
+                               continue
+                       }
+                       if hasherExists {
+                               if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 {
                                        state.Ctx.LogE("sp-file", lesp, errors.New("checksum mismatch"), "")
-                                       return
+                                       continue
                                }
-                               state.Ctx.LogI("sp-done", lesp, "")
-                               if err = os.Rename(filePath+PartSuffix, filePath); err != nil {
+                               if err = os.Rename(filePathPart, filePath); err != nil {
                                        state.Ctx.LogE("sp-file", lesp, err, "rename")
-                                       return
+                                       continue
                                }
                                if err = DirSync(dirToSync); err != nil {
                                        state.Ctx.LogE("sp-file", lesp, err, "sync")
-                                       return
+                                       continue
                                }
-                               state.Lock()
-                               delete(state.infosTheir, *file.Hash)
-                               state.Unlock()
+                               state.Ctx.LogI("sp-file", lesp, "done")
                                state.wg.Add(1)
                                go func() {
                                        state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
                                        state.wg.Done()
                                }()
-                       }()
+                               state.Lock()
+                               delete(state.infosTheir, *file.Hash)
+                               state.Unlock()
+                               if !state.Ctx.HdrUsage {
+                                       state.closeFd(filePathPart)
+                                       continue
+                               }
+                               if _, err = fd.Seek(0, io.SeekStart); err != nil {
+                                       state.Ctx.LogE("sp-file", lesp, err, "seek")
+                                       state.closeFd(filePathPart)
+                                       continue
+                               }
+                               _, pktEncRaw, err := state.Ctx.HdrRead(fd)
+                               state.closeFd(filePathPart)
+                               if err != nil {
+                                       state.Ctx.LogE("sp-file", lesp, err, "HdrRead")
+                                       continue
+                               }
+                               state.Ctx.HdrWrite(pktEncRaw, filePath)
+                               continue
+                       }
+                       state.closeFd(filePathPart)
+                       if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
+                               state.Ctx.LogE("sp-file", lesp, err, "rename")
+                               continue
+                       }
+                       if err = DirSync(dirToSync); err != nil {
+                               state.Ctx.LogE("sp-file", lesp, err, "sync")
+                               continue
+                       }
+                       state.Ctx.LogI("sp-file", lesp, "downloaded")
+                       state.Lock()
+                       delete(state.infosTheir, *file.Hash)
+                       state.Unlock()
+                       if !state.NoCK {
+                               state.checkerJobs <- file.Hash
+                       }
+
                case SPTypeDone:
                        lesp := append(les, LE{"Type", "done"})
                        state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
@@ -1036,18 +1179,23 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        }
                        lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(done.Hash[:])})
                        state.Ctx.LogD("sp-done", lesp, "removing")
-                       err := os.Remove(filepath.Join(
+                       pth := filepath.Join(
                                state.Ctx.Spool,
                                state.Node.Id.String(),
                                string(TTx),
                                Base32Codec.EncodeToString(done.Hash[:]),
-                       ))
+                       )
+                       err := os.Remove(pth)
                        lesp = append(lesp, LE{"XX", string(TTx)})
                        if err == nil {
                                state.Ctx.LogI("sp-done", lesp, "")
+                               if state.Ctx.HdrUsage {
+                                       os.Remove(pth + HdrSuffix)
+                               }
                        } else {
                                state.Ctx.LogE("sp-done", lesp, err, "")
                        }
+
                case SPTypeFreq:
                        lesp := append(les, LE{"Type", "freq"})
                        state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
@@ -1080,6 +1228,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        } else {
                                state.Ctx.LogD("sp-process", lesp, "unknown")
                        }
+
                default:
                        state.Ctx.LogE(
                                "sp-process",