X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Fsp.go;h=ce2e62f5d86be3492fc7a4192bca978c9c62b760;hb=1d2ce674b042d07fd9b37a46578c8b62bb0345b7;hp=540e0e52a22a4f016114ca8a9fed71599802ed54;hpb=8936da23acb06dc60b7c1f94845d5525f98665aa;p=nncp.git diff --git a/src/sp.go b/src/sp.go index 540e0e5..ce2e62f 100644 --- a/src/sp.go +++ b/src/sp.go @@ -26,7 +26,6 @@ import ( "os" "path/filepath" "sort" - "strconv" "sync" "time" @@ -231,6 +230,10 @@ func (state *SPState) ReadSP(src io.Reader) ([]byte, error) { var sp SPRaw n, err := xdr.UnmarshalLimited(src, &sp, 1<<17) if err != nil { + ue := err.(*xdr.UnmarshalError) + if ue.Err == io.EOF { + return nil, ue.Err + } return nil, err } state.RxLastSeen = time.Now() @@ -267,15 +270,15 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [ ctx.LogD("sp-info-our", SDS{ "node": nodeId, "name": ToBase32(info.Hash[:]), - "size": strconv.FormatInt(int64(info.Size), 10), + "size": info.Size, }, "") } if totalSize > 0 { ctx.LogI("sp-infos", SDS{ "xx": string(TTx), "node": nodeId, - "pkts": strconv.Itoa(len(payloads)), - "size": strconv.FormatInt(totalSize, 10), + "pkts": len(payloads), + "size": totalSize, }, "") } return payloadsSplit(payloads) @@ -344,31 +347,31 @@ func (state *SPState) StartI(conn ConnDeadlined) error { state.dirUnlock() return err } - sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(state.Nice))} + sds := SDS{"node": nodeId, "nice": int(state.Nice)} state.Ctx.LogD("sp-start", sds, "sending first message") conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second)) if err = state.WriteSP(conn, buf); err != nil { - state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-start", sds, err, "") state.dirUnlock() return err } state.Ctx.LogD("sp-start", sds, "waiting for first message") conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second)) if buf, err = state.ReadSP(conn); err != nil { - state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-start", sds, err, "") state.dirUnlock() return err } payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf) if err != nil { - state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-start", sds, err, "") state.dirUnlock() return err } state.Ctx.LogD("sp-start", sds, "starting workers") err = state.StartWorkers(conn, infosPayloads, payload) if err != nil { - state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-start", sds, err, "") state.dirUnlock() return err } @@ -399,30 +402,27 @@ func (state *SPState) StartR(conn ConnDeadlined) error { state.xxOnly = xxOnly var buf []byte var payload []byte - state.Ctx.LogD( - "sp-start", - SDS{"nice": strconv.Itoa(int(state.Nice))}, - "waiting for first message", - ) + state.Ctx.LogD("sp-start", SDS{"nice": int(state.Nice)}, "waiting for first message") conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second)) if buf, err = state.ReadSP(conn); err != nil { - state.Ctx.LogE("sp-start", SDS{"err": err}, "") + state.Ctx.LogE("sp-start", SDS{}, err, "") return err } if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil { - state.Ctx.LogE("sp-start", SDS{"err": err}, "") + state.Ctx.LogE("sp-start", SDS{}, err, "") return err } var node *Node - for _, node = range state.Ctx.Neigh { - if subtle.ConstantTimeCompare(state.hs.PeerStatic(), node.NoisePub[:]) == 1 { + for _, n := range state.Ctx.Neigh { + if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 { + node = n break } } if node == nil { peerId := ToBase32(state.hs.PeerStatic()) - state.Ctx.LogE("sp-start", SDS{"peer": peerId}, "unknown") + state.Ctx.LogE("sp-start", SDS{"peer": peerId}, errors.New("unknown"), "") return errors.New("Unknown peer: " + peerId) } state.Node = node @@ -430,7 +430,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error { state.txRate = node.TxRate state.onlineDeadline = node.OnlineDeadline state.maxOnlineTime = node.MaxOnlineTime - sds := SDS{"node": node.Id, "nice": strconv.Itoa(int(state.Nice))} + sds := SDS{"node": node.Id, "nice": int(state.Nice)} if state.Ctx.ensureRxDir(node.Id); err != nil { return err @@ -473,7 +473,7 @@ func (state *SPState) StartR(conn ConnDeadlined) error { } conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second)) if err = state.WriteSP(conn, buf); err != nil { - state.Ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-start", sds, err, "") state.dirUnlock() return err } @@ -490,13 +490,13 @@ func (state *SPState) StartWorkers( conn ConnDeadlined, infosPayloads [][]byte, payload []byte) error { - sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.Nice))} + sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)} if len(infosPayloads) > 1 { go func() { for _, payload := range infosPayloads[1:] { state.Ctx.LogD( "sp-work", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), + SdsAdd(sds, SDS{"size": len(payload)}), "queuing remaining payload", ) state.payloads <- payload @@ -505,12 +505,12 @@ func (state *SPState) StartWorkers( } state.Ctx.LogD( "sp-work", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), + SdsAdd(sds, SDS{"size": len(payload)}), "processing first payload", ) replies, err := state.ProcessSP(payload) if err != nil { - state.Ctx.LogE("sp-work", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-work", sds, err, "") return err } @@ -518,7 +518,7 @@ func (state *SPState) StartWorkers( for _, reply := range replies { state.Ctx.LogD( "sp-work", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}), + SdsAdd(sds, SDS{"size": len(reply)}), "queuing reply", ) state.payloads <- reply @@ -538,7 +538,7 @@ func (state *SPState) StartWorkers( ) { state.Ctx.LogD( "sp-work", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), + SdsAdd(sds, SDS{"size": len(payload)}), "queuing new info", ) state.payloads <- payload @@ -562,7 +562,7 @@ func (state *SPState) StartWorkers( case payload = <-state.payloads: state.Ctx.LogD( "sp-xmit", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), + SdsAdd(sds, SDS{"size": len(payload)}), "got payload", ) default: @@ -584,8 +584,8 @@ func (state *SPState) StartWorkers( sdsp := SdsAdd(sds, SDS{ "xx": string(TTx), - "hash": ToBase32(freq.Hash[:]), - "size": strconv.FormatInt(int64(freq.Offset), 10), + "pkt": ToBase32(freq.Hash[:]), + "size": int64(freq.Offset), }) state.Ctx.LogD("sp-file", sdsp, "queueing") fd, err := os.Open(filepath.Join( @@ -595,32 +595,32 @@ func (state *SPState) StartWorkers( ToBase32(freq.Hash[:]), )) if err != nil { - state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") + state.Ctx.LogE("sp-file", sdsp, err, "") break } fi, err := fd.Stat() if err != nil { - state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") + state.Ctx.LogE("sp-file", sdsp, err, "") break } - fullSize := uint64(fi.Size()) + fullSize := fi.Size() var buf []byte - if freq.Offset < fullSize { + if freq.Offset < uint64(fullSize) { state.Ctx.LogD("sp-file", sdsp, "seeking") if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil { - state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") + state.Ctx.LogE("sp-file", sdsp, err, "") break } buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead) n, err := fd.Read(buf) if err != nil { - state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") + state.Ctx.LogE("sp-file", sdsp, err, "") break } buf = buf[:n] state.Ctx.LogD( "sp-file", - SdsAdd(sdsp, SDS{"size": strconv.Itoa(n)}), + SdsAdd(sdsp, SDS{"size": n}), "read", ) } @@ -631,12 +631,14 @@ func (state *SPState) StartWorkers( Payload: buf, }) ourSize := freq.Offset + uint64(len(buf)) - sdsp["size"] = strconv.FormatInt(int64(ourSize), 10) - sdsp["fullsize"] = strconv.FormatInt(int64(fullSize), 10) - state.Ctx.LogP("sp-file", sdsp, "") + sdsp["size"] = int64(ourSize) + sdsp["fullsize"] = fullSize + if state.Ctx.ShowPrgrs { + Progress(sdsp) + } state.Lock() if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash { - if ourSize == fullSize { + if ourSize == uint64(fullSize) { state.Ctx.LogD("sp-file", sdsp, "finished") if len(state.queueTheir) > 1 { state.queueTheir = state.queueTheir[1:] @@ -653,12 +655,12 @@ func (state *SPState) StartWorkers( } state.Ctx.LogD( "sp-xmit", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), + SdsAdd(sds, SDS{"size": len(payload)}), "sending", ) conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second)) if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil { - state.Ctx.LogE("sp-xmit", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-xmit", sds, err, "") break } } @@ -686,34 +688,34 @@ func (state *SPState) StartWorkers( if unmarshalErr.ErrorCode == xdr.ErrIO { break } - state.Ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-recv", sds, err, "") break } state.Ctx.LogD( "sp-recv", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), + SdsAdd(sds, SDS{"size": len(payload)}), "got payload", ) payload, err = state.csTheir.Decrypt(nil, nil, payload) if err != nil { - state.Ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-recv", sds, err, "") break } state.Ctx.LogD( "sp-recv", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}), + SdsAdd(sds, SDS{"size": len(payload)}), "processing", ) replies, err := state.ProcessSP(payload) if err != nil { - state.Ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-recv", sds, err, "") break } go func() { for _, reply := range replies { state.Ctx.LogD( "sp-recv", - SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}), + SdsAdd(sds, SDS{"size": len(reply)}), "queuing reply", ) state.payloads <- reply @@ -745,7 +747,7 @@ func (state *SPState) Wait() { } func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { - sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.Nice))} + sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)} r := bytes.NewReader(payload) var err error var replies [][]byte @@ -754,7 +756,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.Ctx.LogD("sp-process", sds, "unmarshaling header") var head SPHead if _, err = xdr.Unmarshal(r, &head); err != nil { - state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"err": err}), "") + state.Ctx.LogE("sp-process", sds, err, "") return nil, err } switch head.Type { @@ -764,13 +766,13 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet") var info SPInfo if _, err = xdr.Unmarshal(r, &info); err != nil { - state.Ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "") + state.Ctx.LogE("sp-process", sdsp, err, "") return nil, err } sdsp = SdsAdd(sds, SDS{ - "hash": ToBase32(info.Hash[:]), - "size": strconv.FormatInt(int64(info.Size), 10), - "nice": strconv.Itoa(int(info.Nice)), + "pkt": ToBase32(info.Hash[:]), + "size": int64(info.Size), + "nice": int(info.Nice), }) if !state.listOnly && info.Nice > state.Nice { state.Ctx.LogD("sp-process", sdsp, "too nice") @@ -815,7 +817,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } state.Ctx.LogI( "sp-info", - SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(offset, 10)}), + SdsAdd(sdsp, SDS{"offset": offset}), "", ) if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) { @@ -829,21 +831,18 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet") var file SPFile if _, err = xdr.Unmarshal(r, &file); err != nil { - state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{ - "err": err, - "type": "file", - }), "") + state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "file"}), err, "") return nil, err } sdsp["xx"] = string(TRx) - sdsp["hash"] = ToBase32(file.Hash[:]) - sdsp["size"] = strconv.Itoa(len(file.Payload)) - filePath := filepath.Join( + sdsp["pkt"] = ToBase32(file.Hash[:]) + sdsp["size"] = len(file.Payload) + dirToSync := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TRx), - ToBase32(file.Hash[:]), ) + filePath := filepath.Join(dirToSync, ToBase32(file.Hash[:])) state.Ctx.LogD("sp-file", sdsp, "opening part") fd, err := os.OpenFile( filePath+PartSuffix, @@ -851,31 +850,33 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { os.FileMode(0666), ) if err != nil { - state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") + state.Ctx.LogE("sp-file", sdsp, err, "") return nil, err } state.Ctx.LogD( "sp-file", - SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(int64(file.Offset), 10)}), + SdsAdd(sdsp, SDS{"offset": file.Offset}), "seeking", ) if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil { - state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") + state.Ctx.LogE("sp-file", sdsp, err, "") fd.Close() return nil, err } state.Ctx.LogD("sp-file", sdsp, "writing") _, err = fd.Write(file.Payload) if err != nil { - state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "") + state.Ctx.LogE("sp-file", sdsp, err, "") fd.Close() return nil, err } - ourSize := uint64(file.Offset) + uint64(len(file.Payload)) + ourSize := file.Offset + uint64(len(file.Payload)) state.RLock() - sdsp["fullsize"] = strconv.FormatInt(int64(state.infosTheir[*file.Hash].Size), 10) - sdsp["size"] = strconv.FormatInt(int64(ourSize), 10) - state.Ctx.LogP("sp-file", sdsp, "") + sdsp["size"] = int64(ourSize) + sdsp["fullsize"] = int64(state.infosTheir[*file.Hash].Size) + if state.Ctx.ShowPrgrs { + Progress(sdsp) + } if state.infosTheir[*file.Hash].Size != ourSize { state.RUnlock() fd.Close() @@ -886,7 +887,7 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { spWorkersGroup.Add(1) go func() { if err := fd.Sync(); err != nil { - state.Ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "sync") + state.Ctx.LogE("sp-file", sdsp, err, "sync") fd.Close() return } @@ -894,14 +895,21 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { defer state.wg.Done() fd.Seek(0, io.SeekStart) state.Ctx.LogD("sp-file", sdsp, "checking") - gut, err := Check(fd, file.Hash[:]) + gut, err := Check(fd, file.Hash[:], sdsp, state.Ctx.ShowPrgrs) fd.Close() if err != nil || !gut { - state.Ctx.LogE("sp-file", sdsp, "checksum mismatch") + state.Ctx.LogE("sp-file", sdsp, errors.New("checksum mismatch"), "") return } state.Ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "") - os.Rename(filePath+PartSuffix, filePath) + if err = os.Rename(filePath+PartSuffix, filePath); err != nil { + state.Ctx.LogE("sp-file", sdsp, err, "rename") + return + } + if err = DirSync(dirToSync); err != nil { + state.Ctx.LogE("sp-file", sdsp, err, "sync") + return + } state.Lock() delete(state.infosTheir, *file.Hash) state.Unlock() @@ -915,13 +923,10 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet") var done SPDone if _, err = xdr.Unmarshal(r, &done); err != nil { - state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{ - "type": "done", - "err": err, - }), "") + state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "done"}), err, "") return nil, err } - sdsp["hash"] = ToBase32(done.Hash[:]) + sdsp["pkt"] = ToBase32(done.Hash[:]) state.Ctx.LogD("sp-done", sdsp, "removing") err := os.Remove(filepath.Join( state.Ctx.Spool, @@ -933,18 +938,18 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { if err == nil { state.Ctx.LogI("sp-done", sdsp, "") } else { - state.Ctx.LogE("sp-done", sdsp, "") + state.Ctx.LogE("sp-done", sdsp, err, "") } case SPTypeFreq: sdsp := SdsAdd(sds, SDS{"type": "freq"}) state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet") var freq SPFreq if _, err = xdr.Unmarshal(r, &freq); err != nil { - state.Ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "") + state.Ctx.LogE("sp-process", sdsp, err, "") return nil, err } - sdsp["hash"] = ToBase32(freq.Hash[:]) - sdsp["offset"] = strconv.FormatInt(int64(freq.Offset), 10) + sdsp["pkt"] = ToBase32(freq.Hash[:]) + sdsp["offset"] = freq.Offset state.Ctx.LogD("sp-process", sdsp, "queueing") nice, exists := state.infosOurSeen[*freq.Hash] if exists { @@ -976,7 +981,8 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.Ctx.LogE( "sp-process", SdsAdd(sds, SDS{"type": head.Type}), - "unknown", + errors.New("unknown type"), + "", ) return nil, BadPktType } @@ -993,8 +999,8 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.Ctx.LogI("sp-infos", SDS{ "xx": string(TRx), "node": state.Node.Id, - "pkts": strconv.Itoa(pkts), - "size": strconv.FormatInt(int64(size), 10), + "pkts": pkts, + "size": int64(size), }, "") } return payloadsSplit(replies), nil