X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Ftx.go;h=67bb659a672426895919fcd5961633c75d455df9;hb=1d2ce674b042d07fd9b37a46578c8b62bb0345b7;hp=7d497769031695cfd058317b3511ca851028592d;hpb=fdaeef8f33fb95b920054db69212e6f06fb3dbc5;p=nncp.git diff --git a/src/tx.go b/src/tx.go index 7d49776..67bb659 100644 --- a/src/tx.go +++ b/src/tx.go @@ -51,6 +51,7 @@ func (ctx *Ctx) Tx( nice uint8, size, minSize int64, src io.Reader, + pktName string, ) (*Node, error) { hops := make([]*Node, 0, 1+len(node.Via)) hops = append(hops, node) @@ -81,8 +82,8 @@ func (ctx *Ctx) Tx( go func(size int64, src io.Reader, dst io.WriteCloser) { ctx.LogD("tx", SDS{ "node": hops[0].Id, - "nice": strconv.Itoa(int(nice)), - "size": strconv.FormatInt(size, 10), + "nice": int(nice), + "size": size, }, "wrote") errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst) dst.Close() @@ -97,8 +98,8 @@ func (ctx *Ctx) Tx( go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) { ctx.LogD("tx", SDS{ "node": node.Id, - "nice": strconv.Itoa(int(nice)), - "size": strconv.FormatInt(size, 10), + "nice": int(nice), + "size": size, }, "trns wrote") errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst) dst.Close() @@ -106,7 +107,11 @@ func (ctx *Ctx) Tx( curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) } go func() { - _, err := io.Copy(tmp.W, pipeR) + _, err := CopyProgressed( + tmp.W, pipeR, + SDS{"xx": string(TTx), "pkt": pktName, "fullsize": curSize}, + ctx.ShowPrgrs, + ) errs <- err }() for i := 0; i <= len(hops); i++ { @@ -286,7 +291,7 @@ func (ctx *Ctx) TxFile( nice uint8, srcPath, dstPath string, chunkSize int64, - minSize int64, + minSize, maxSize int64, ) error { dstPathSpecified := false if dstPath == "" { @@ -308,6 +313,9 @@ func (ctx *Ctx) TxFile( if err != nil { return err } + if fileSize > maxSize { + return errors.New("Too big than allowed") + } if archived && !dstPathSpecified { dstPath += TarExt } @@ -317,19 +325,19 @@ func (ctx *Ctx) TxFile( if err != nil { return err } - _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader) + _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath) sds := SDS{ "type": "file", "node": node.Id, - "nice": strconv.Itoa(int(nice)), + "nice": int(nice), "src": srcPath, "dst": dstPath, - "size": strconv.FormatInt(fileSize, 10), + "size": fileSize, } if err == nil { ctx.LogI("tx", sds, "sent") } else { - ctx.LogE("tx", SdsAdd(sds, SDS{"err": err}), "sent") + ctx.LogE("tx", sds, err, "sent") } return err } @@ -372,19 +380,20 @@ func (ctx *Ctx) TxFile( sizeToSend, minSize, io.TeeReader(reader, hsh), + path, ) sds := SDS{ "type": "file", "node": node.Id, - "nice": strconv.Itoa(int(nice)), + "nice": int(nice), "src": srcPath, "dst": path, - "size": strconv.FormatInt(sizeToSend, 10), + "size": sizeToSend, } if err == nil { ctx.LogI("tx", sds, "sent") } else { - ctx.LogE("tx", SdsAdd(sds, SDS{"err": err}), "sent") + ctx.LogE("tx", sds, err, "sent") return err } hsh.Sum(metaPkt.Checksums[chunkNum][:0]) @@ -405,19 +414,19 @@ func (ctx *Ctx) TxFile( return err } metaPktSize := int64(metaBuf.Len()) - _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf) + _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path) sds := SDS{ "type": "file", "node": node.Id, - "nice": strconv.Itoa(int(nice)), + "nice": int(nice), "src": srcPath, "dst": path, - "size": strconv.FormatInt(metaPktSize, 10), + "size": metaPktSize, } if err == nil { ctx.LogI("tx", sds, "sent") } else { - ctx.LogE("tx", SdsAdd(sds, SDS{"err": err}), "sent") + ctx.LogE("tx", sds, err, "sent") } return err } @@ -441,19 +450,19 @@ func (ctx *Ctx) TxFreq( } src := strings.NewReader(dstPath) size := int64(src.Len()) - _, err = ctx.Tx(node, pkt, nice, size, minSize, src) + _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath) sds := SDS{ "type": "freq", "node": node.Id, - "nice": strconv.Itoa(int(nice)), - "replynice": strconv.Itoa(int(replyNice)), + "nice": int(nice), + "replynice": int(replyNice), "src": srcPath, "dst": dstPath, } if err == nil { ctx.LogI("tx", sds, "sent") } else { - ctx.LogE("tx", SdsAdd(sds, SDS{"err": err}), "sent") + ctx.LogE("tx", sds, err, "sent") } return err } @@ -489,19 +498,19 @@ func (ctx *Ctx) TxExec( return err } size := int64(compressed.Len()) - _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed) + _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed, handle) sds := SDS{ "type": "exec", "node": node.Id, - "nice": strconv.Itoa(int(nice)), - "replynice": strconv.Itoa(int(replyNice)), + "nice": int(nice), + "replynice": int(replyNice), "dst": strings.Join(append([]string{handle}, args...), " "), - "size": strconv.FormatInt(size, 10), + "size": size, } if err == nil { ctx.LogI("tx", sds, "sent") } else { - ctx.LogE("tx", SdsAdd(sds, SDS{"err": err}), "sent") + ctx.LogE("tx", sds, err, "sent") } return err } @@ -510,20 +519,24 @@ func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error sds := SDS{ "type": "trns", "node": node.Id, - "nice": strconv.Itoa(int(nice)), - "size": strconv.FormatInt(size, 10), + "nice": int(nice), + "size": size, } ctx.LogD("tx", sds, "taken") if !ctx.IsEnoughSpace(size) { err := errors.New("is not enough space") - ctx.LogE("tx", SdsAdd(sds, SDS{"err": err}), err.Error()) + ctx.LogE("tx", sds, err, err.Error()) return err } tmp, err := ctx.NewTmpFileWHash() if err != nil { return err } - if _, err = io.Copy(tmp.W, src); err != nil { + if _, err = CopyProgressed(tmp.W, src, SDS{ + "xx": string(TTx), + "pkt": node.Id.String(), + "fullsize": size, + }, ctx.ShowPrgrs); err != nil { return err } nodePath := filepath.Join(ctx.Spool, node.Id.String())