]> Cypherpunks.ru repositories - nncp.git/blobdiff - src/sp.go
Operations progress
[nncp.git] / src / sp.go
index 7b8fc3113b2db7d2111ba68bf7c1f0c346bc98cf..ce2e62f5d86be3492fc7a4192bca978c9c62b760 100644 (file)
--- a/src/sp.go
+++ b/src/sp.go
@@ -26,7 +26,6 @@ import (
        "os"
        "path/filepath"
        "sort"
-       "strconv"
        "sync"
        "time"
 
@@ -271,15 +270,15 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [
                ctx.LogD("sp-info-our", SDS{
                        "node": nodeId,
                        "name": ToBase32(info.Hash[:]),
-                       "size": strconv.FormatInt(int64(info.Size), 10),
+                       "size": info.Size,
                }, "")
        }
        if totalSize > 0 {
                ctx.LogI("sp-infos", SDS{
                        "xx":   string(TTx),
                        "node": nodeId,
-                       "pkts": strconv.Itoa(len(payloads)),
-                       "size": strconv.FormatInt(totalSize, 10),
+                       "pkts": len(payloads),
+                       "size": totalSize,
                }, "")
        }
        return payloadsSplit(payloads)
@@ -348,31 +347,31 @@ func (state *SPState) StartI(conn ConnDeadlined) error {
                state.dirUnlock()
                return err
        }
-       sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(state.Nice))}
+       sds := SDS{"node": nodeId, "nice": int(state.Nice)}
        state.Ctx.LogD("sp-start", sds, "sending first message")
        conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
        if err = state.WriteSP(conn, buf); err != nil {
-               state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
+               state.Ctx.LogE("sp-start", sds, err, "")
                state.dirUnlock()
                return err
        }
        state.Ctx.LogD("sp-start", sds, "waiting for first message")
        conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
        if buf, err = state.ReadSP(conn); err != nil {
-               state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
+               state.Ctx.LogE("sp-start", sds, err, "")
                state.dirUnlock()
                return err
        }
        payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
        if err != nil {
-               state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
+               state.Ctx.LogE("sp-start", sds, err, "")
                state.dirUnlock()
                return err
        }
        state.Ctx.LogD("sp-start", sds, "starting workers")
        err = state.StartWorkers(conn, infosPayloads, payload)
        if err != nil {
-               state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
+               state.Ctx.LogE("sp-start", sds, err, "")
                state.dirUnlock()
                return err
        }
@@ -403,18 +402,14 @@ func (state *SPState) StartR(conn ConnDeadlined) error {
        state.xxOnly = xxOnly
        var buf []byte
        var payload []byte
-       state.Ctx.LogD(
-               "sp-start",
-               SDS{"nice": strconv.Itoa(int(state.Nice))},
-               "waiting for first message",
-       )
+       state.Ctx.LogD("sp-start", SDS{"nice": int(state.Nice)}, "waiting for first message")
        conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
        if buf, err = state.ReadSP(conn); err != nil {
-               state.Ctx.LogE("sp-start", SDS{"err": err}, "")
+               state.Ctx.LogE("sp-start", SDS{}, err, "")
                return err
        }
        if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
-               state.Ctx.LogE("sp-start", SDS{"err": err}, "")
+               state.Ctx.LogE("sp-start", SDS{}, err, "")
                return err
        }
 
@@ -427,7 +422,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error {
        }
        if node == nil {
                peerId := ToBase32(state.hs.PeerStatic())
-               state.Ctx.LogE("sp-start", SDS{"peer": peerId}, "unknown")
+               state.Ctx.LogE("sp-start", SDS{"peer": peerId}, errors.New("unknown"), "")
                return errors.New("Unknown peer: " + peerId)
        }
        state.Node = node
@@ -435,7 +430,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error {
        state.txRate = node.TxRate
        state.onlineDeadline = node.OnlineDeadline
        state.maxOnlineTime = node.MaxOnlineTime
-       sds := SDS{"node": node.Id, "nice": strconv.Itoa(int(state.Nice))}
+       sds := SDS{"node": node.Id, "nice": int(state.Nice)}
 
        if state.Ctx.ensureRxDir(node.Id); err != nil {
                return err
@@ -478,7 +473,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error {
        }
        conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
        if err = state.WriteSP(conn, buf); err != nil {
-               state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
+               state.Ctx.LogE("sp-start", sds, err, "")
                state.dirUnlock()
                return err
        }
@@ -495,13 +490,13 @@ func (state *SPState) StartWorkers(
        conn ConnDeadlined,
        infosPayloads [][]byte,
        payload []byte) error {
-       sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.Nice))}
+       sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)}
        if len(infosPayloads) > 1 {
                go func() {
                        for _, payload := range infosPayloads[1:] {
                                state.Ctx.LogD(
                                        "sp-work",
-                                       SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
+                                       SdsAdd(sds, SDS{"size": len(payload)}),
                                        "queuing remaining payload",
                                )
                                state.payloads <- payload
@@ -510,12 +505,12 @@ func (state *SPState) StartWorkers(
        }
        state.Ctx.LogD(
                "sp-work",
-               SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
+               SdsAdd(sds, SDS{"size": len(payload)}),
                "processing first payload",
        )
        replies, err := state.ProcessSP(payload)
        if err != nil {
-               state.Ctx.LogE("sp-work", SdsAdd(sds, SDS{"err": err}), "")
+               state.Ctx.LogE("sp-work", sds, err, "")
                return err
        }
 
@@ -523,7 +518,7 @@ func (state *SPState) StartWorkers(
                for _, reply := range replies {
                        state.Ctx.LogD(
                                "sp-work",
-                               SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}),
+                               SdsAdd(sds, SDS{"size": len(reply)}),
                                "queuing reply",
                        )
                        state.payloads <- reply
@@ -543,7 +538,7 @@ func (state *SPState) StartWorkers(
                                ) {
                                        state.Ctx.LogD(
                                                "sp-work",
-                                               SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
+                                               SdsAdd(sds, SDS{"size": len(payload)}),
                                                "queuing new info",
                                        )
                                        state.payloads <- payload
@@ -567,7 +562,7 @@ func (state *SPState) StartWorkers(
                        case payload = <-state.payloads:
                                state.Ctx.LogD(
                                        "sp-xmit",
-                                       SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
+                                       SdsAdd(sds, SDS{"size": len(payload)}),
                                        "got payload",
                                )
                        default:
@@ -589,8 +584,8 @@ func (state *SPState) StartWorkers(
 
                                sdsp := SdsAdd(sds, SDS{
                                        "xx":   string(TTx),
-                                       "hash": ToBase32(freq.Hash[:]),
-                                       "size": strconv.FormatInt(int64(freq.Offset), 10),
+                                       "pkt":  ToBase32(freq.Hash[:]),
+                                       "size": int64(freq.Offset),
                                })
                                state.Ctx.LogD("sp-file", sdsp, "queueing")
                                fd, err := os.Open(filepath.Join(
@@ -600,32 +595,32 @@ func (state *SPState) StartWorkers(
                                        ToBase32(freq.Hash[:]),
                                ))
                                if err != nil {
-                                       state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
+                                       state.Ctx.LogE("sp-file", sdsp, err, "")
                                        break
                                }
                                fi, err := fd.Stat()
                                if err != nil {
-                                       state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
+                                       state.Ctx.LogE("sp-file", sdsp, err, "")
                                        break
                                }
-                               fullSize := uint64(fi.Size())
+                               fullSize := fi.Size()
                                var buf []byte
-                               if freq.Offset < fullSize {
+                               if freq.Offset < uint64(fullSize) {
                                        state.Ctx.LogD("sp-file", sdsp, "seeking")
                                        if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
-                                               state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
+                                               state.Ctx.LogE("sp-file", sdsp, err, "")
                                                break
                                        }
                                        buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
                                        n, err := fd.Read(buf)
                                        if err != nil {
-                                               state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
+                                               state.Ctx.LogE("sp-file", sdsp, err, "")
                                                break
                                        }
                                        buf = buf[:n]
                                        state.Ctx.LogD(
                                                "sp-file",
-                                               SdsAdd(sdsp, SDS{"size": strconv.Itoa(n)}),
+                                               SdsAdd(sdsp, SDS{"size": n}),
                                                "read",
                                        )
                                }
@@ -636,12 +631,14 @@ func (state *SPState) StartWorkers(
                                        Payload: buf,
                                })
                                ourSize := freq.Offset + uint64(len(buf))
-                               sdsp["size"] = strconv.FormatInt(int64(ourSize), 10)
-                               sdsp["fullsize"] = strconv.FormatInt(int64(fullSize), 10)
-                               state.Ctx.LogP("sp-file", sdsp, "")
+                               sdsp["size"] = int64(ourSize)
+                               sdsp["fullsize"] = fullSize
+                               if state.Ctx.ShowPrgrs {
+                                       Progress(sdsp)
+                               }
                                state.Lock()
                                if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
-                                       if ourSize == fullSize {
+                                       if ourSize == uint64(fullSize) {
                                                state.Ctx.LogD("sp-file", sdsp, "finished")
                                                if len(state.queueTheir) > 1 {
                                                        state.queueTheir = state.queueTheir[1:]
@@ -658,12 +655,12 @@ func (state *SPState) StartWorkers(
                        }
                        state.Ctx.LogD(
                                "sp-xmit",
-                               SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
+                               SdsAdd(sds, SDS{"size": len(payload)}),
                                "sending",
                        )
                        conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
                        if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil {
-                               state.Ctx.LogE("sp-xmit", SdsAdd(sds, SDS{"err": err}), "")
+                               state.Ctx.LogE("sp-xmit", sds, err, "")
                                break
                        }
                }
@@ -691,34 +688,34 @@ func (state *SPState) StartWorkers(
                                if unmarshalErr.ErrorCode == xdr.ErrIO {
                                        break
                                }
-                               state.Ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
+                               state.Ctx.LogE("sp-recv", sds, err, "")
                                break
                        }
                        state.Ctx.LogD(
                                "sp-recv",
-                               SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
+                               SdsAdd(sds, SDS{"size": len(payload)}),
                                "got payload",
                        )
                        payload, err = state.csTheir.Decrypt(nil, nil, payload)
                        if err != nil {
-                               state.Ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
+                               state.Ctx.LogE("sp-recv", sds, err, "")
                                break
                        }
                        state.Ctx.LogD(
                                "sp-recv",
-                               SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
+                               SdsAdd(sds, SDS{"size": len(payload)}),
                                "processing",
                        )
                        replies, err := state.ProcessSP(payload)
                        if err != nil {
-                               state.Ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
+                               state.Ctx.LogE("sp-recv", sds, err, "")
                                break
                        }
                        go func() {
                                for _, reply := range replies {
                                        state.Ctx.LogD(
                                                "sp-recv",
-                                               SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}),
+                                               SdsAdd(sds, SDS{"size": len(reply)}),
                                                "queuing reply",
                                        )
                                        state.payloads <- reply
@@ -750,7 +747,7 @@ func (state *SPState) Wait() {
 }
 
 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
-       sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.Nice))}
+       sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)}
        r := bytes.NewReader(payload)
        var err error
        var replies [][]byte
@@ -759,7 +756,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                state.Ctx.LogD("sp-process", sds, "unmarshaling header")
                var head SPHead
                if _, err = xdr.Unmarshal(r, &head); err != nil {
-                       state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"err": err}), "")
+                       state.Ctx.LogE("sp-process", sds, err, "")
                        return nil, err
                }
                switch head.Type {
@@ -769,13 +766,13 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
                        var info SPInfo
                        if _, err = xdr.Unmarshal(r, &info); err != nil {
-                               state.Ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "")
+                               state.Ctx.LogE("sp-process", sdsp, err, "")
                                return nil, err
                        }
                        sdsp = SdsAdd(sds, SDS{
-                               "hash": ToBase32(info.Hash[:]),
-                               "size": strconv.FormatInt(int64(info.Size), 10),
-                               "nice": strconv.Itoa(int(info.Nice)),
+                               "pkt":  ToBase32(info.Hash[:]),
+                               "size": int64(info.Size),
+                               "nice": int(info.Nice),
                        })
                        if !state.listOnly && info.Nice > state.Nice {
                                state.Ctx.LogD("sp-process", sdsp, "too nice")
@@ -820,7 +817,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        }
                        state.Ctx.LogI(
                                "sp-info",
-                               SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(offset, 10)}),
+                               SdsAdd(sdsp, SDS{"offset": offset}),
                                "",
                        )
                        if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
@@ -834,15 +831,12 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
                        var file SPFile
                        if _, err = xdr.Unmarshal(r, &file); err != nil {
-                               state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{
-                                       "err":  err,
-                                       "type": "file",
-                               }), "")
+                               state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "file"}), err, "")
                                return nil, err
                        }
                        sdsp["xx"] = string(TRx)
-                       sdsp["hash"] = ToBase32(file.Hash[:])
-                       sdsp["size"] = strconv.Itoa(len(file.Payload))
+                       sdsp["pkt"] = ToBase32(file.Hash[:])
+                       sdsp["size"] = len(file.Payload)
                        dirToSync := filepath.Join(
                                state.Ctx.Spool,
                                state.Node.Id.String(),
@@ -856,31 +850,33 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                os.FileMode(0666),
                        )
                        if err != nil {
-                               state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
+                               state.Ctx.LogE("sp-file", sdsp, err, "")
                                return nil, err
                        }
                        state.Ctx.LogD(
                                "sp-file",
-                               SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(int64(file.Offset), 10)}),
+                               SdsAdd(sdsp, SDS{"offset": file.Offset}),
                                "seeking",
                        )
                        if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
-                               state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
+                               state.Ctx.LogE("sp-file", sdsp, err, "")
                                fd.Close()
                                return nil, err
                        }
                        state.Ctx.LogD("sp-file", sdsp, "writing")
                        _, err = fd.Write(file.Payload)
                        if err != nil {
-                               state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
+                               state.Ctx.LogE("sp-file", sdsp, err, "")
                                fd.Close()
                                return nil, err
                        }
-                       ourSize := uint64(file.Offset) + uint64(len(file.Payload))
+                       ourSize := file.Offset + uint64(len(file.Payload))
                        state.RLock()
-                       sdsp["fullsize"] = strconv.FormatInt(int64(state.infosTheir[*file.Hash].Size), 10)
-                       sdsp["size"] = strconv.FormatInt(int64(ourSize), 10)
-                       state.Ctx.LogP("sp-file", sdsp, "")
+                       sdsp["size"] = int64(ourSize)
+                       sdsp["fullsize"] = int64(state.infosTheir[*file.Hash].Size)
+                       if state.Ctx.ShowPrgrs {
+                               Progress(sdsp)
+                       }
                        if state.infosTheir[*file.Hash].Size != ourSize {
                                state.RUnlock()
                                fd.Close()
@@ -891,7 +887,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        spWorkersGroup.Add(1)
                        go func() {
                                if err := fd.Sync(); err != nil {
-                                       state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "sync")
+                                       state.Ctx.LogE("sp-file", sdsp, err, "sync")
                                        fd.Close()
                                        return
                                }
@@ -899,19 +895,19 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                                defer state.wg.Done()
                                fd.Seek(0, io.SeekStart)
                                state.Ctx.LogD("sp-file", sdsp, "checking")
-                               gut, err := Check(fd, file.Hash[:])
+                               gut, err := Check(fd, file.Hash[:], sdsp, state.Ctx.ShowPrgrs)
                                fd.Close()
                                if err != nil || !gut {
-                                       state.Ctx.LogE("sp-file", sdsp, "checksum mismatch")
+                                       state.Ctx.LogE("sp-file", sdsp, errors.New("checksum mismatch"), "")
                                        return
                                }
                                state.Ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "")
                                if err = os.Rename(filePath+PartSuffix, filePath); err != nil {
-                                       state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "rename")
+                                       state.Ctx.LogE("sp-file", sdsp, err, "rename")
                                        return
                                }
                                if err = DirSync(dirToSync); err != nil {
-                                       state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "sync")
+                                       state.Ctx.LogE("sp-file", sdsp, err, "sync")
                                        return
                                }
                                state.Lock()
@@ -927,13 +923,10 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
                        var done SPDone
                        if _, err = xdr.Unmarshal(r, &done); err != nil {
-                               state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{
-                                       "type": "done",
-                                       "err":  err,
-                               }), "")
+                               state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "done"}), err, "")
                                return nil, err
                        }
-                       sdsp["hash"] = ToBase32(done.Hash[:])
+                       sdsp["pkt"] = ToBase32(done.Hash[:])
                        state.Ctx.LogD("sp-done", sdsp, "removing")
                        err := os.Remove(filepath.Join(
                                state.Ctx.Spool,
@@ -945,18 +938,18 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        if err == nil {
                                state.Ctx.LogI("sp-done", sdsp, "")
                        } else {
-                               state.Ctx.LogE("sp-done", sdsp, "")
+                               state.Ctx.LogE("sp-done", sdsp, err, "")
                        }
                case SPTypeFreq:
                        sdsp := SdsAdd(sds, SDS{"type": "freq"})
                        state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
                        var freq SPFreq
                        if _, err = xdr.Unmarshal(r, &freq); err != nil {
-                               state.Ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "")
+                               state.Ctx.LogE("sp-process", sdsp, err, "")
                                return nil, err
                        }
-                       sdsp["hash"] = ToBase32(freq.Hash[:])
-                       sdsp["offset"] = strconv.FormatInt(int64(freq.Offset), 10)
+                       sdsp["pkt"] = ToBase32(freq.Hash[:])
+                       sdsp["offset"] = freq.Offset
                        state.Ctx.LogD("sp-process", sdsp, "queueing")
                        nice, exists := state.infosOurSeen[*freq.Hash]
                        if exists {
@@ -988,7 +981,8 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                        state.Ctx.LogE(
                                "sp-process",
                                SdsAdd(sds, SDS{"type": head.Type}),
-                               "unknown",
+                               errors.New("unknown type"),
+                               "",
                        )
                        return nil, BadPktType
                }
@@ -1005,8 +999,8 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
                state.Ctx.LogI("sp-infos", SDS{
                        "xx":   string(TRx),
                        "node": state.Node.Id,
-                       "pkts": strconv.Itoa(pkts),
-                       "size": strconv.FormatInt(int64(size), 10),
+                       "pkts": pkts,
+                       "size": int64(size),
                }, "")
        }
        return payloadsSplit(replies), nil