X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Ftx.go;h=67bb659a672426895919fcd5961633c75d455df9;hb=1d2ce674b042d07fd9b37a46578c8b62bb0345b7;hp=4b7cb347cc992a3f4f4e58152671d61581bbe275;hpb=a951a9623212267cb820eee6dd9af605512f6e01;p=nncp.git diff --git a/src/tx.go b/src/tx.go index 4b7cb34..67bb659 100644 --- a/src/tx.go +++ b/src/tx.go @@ -39,6 +39,8 @@ import ( ) const ( + MaxFileSize = 1 << 62 + TarBlockSize = 512 TarExt = ".tar" ) @@ -49,6 +51,7 @@ func (ctx *Ctx) Tx( nice uint8, size, minSize int64, src io.Reader, + pktName string, ) (*Node, error) { hops := make([]*Node, 0, 1+len(node.Via)) hops = append(hops, node) @@ -79,8 +82,8 @@ func (ctx *Ctx) Tx( go func(size int64, src io.Reader, dst io.WriteCloser) { ctx.LogD("tx", SDS{ "node": hops[0].Id, - "nice": strconv.Itoa(int(nice)), - "size": strconv.FormatInt(size, 10), + "nice": int(nice), + "size": size, }, "wrote") errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst) dst.Close() @@ -95,8 +98,8 @@ func (ctx *Ctx) Tx( go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) { ctx.LogD("tx", SDS{ "node": node.Id, - "nice": strconv.Itoa(int(nice)), - "size": strconv.FormatInt(size, 10), + "nice": int(nice), + "size": size, }, "trns wrote") errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst) dst.Close() @@ -104,7 +107,11 @@ func (ctx *Ctx) Tx( curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) } go func() { - _, err := io.Copy(tmp.W, pipeR) + _, err := CopyProgressed( + tmp.W, pipeR, + SDS{"xx": string(TTx), "pkt": pktName, "fullsize": curSize}, + ctx.ShowPrgrs, + ) errs <- err }() for i := 0; i <= len(hops); i++ { @@ -279,58 +286,12 @@ func prepareTxFile(srcPath string) (reader io.Reader, closer io.Closer, fileSize return } -func (ctx *Ctx) TxFile(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error { - dstPathSpecified := false - if dstPath == "" { - if srcPath == "-" { - return errors.New("Must provide destination filename") - } - dstPath = filepath.Base(srcPath) - } else { - dstPathSpecified = true - } - dstPath = filepath.Clean(dstPath) - if filepath.IsAbs(dstPath) { - return errors.New("Relative destination path required") - } - reader, closer, fileSize, archived, err := prepareTxFile(srcPath) - if closer != nil { - defer closer.Close() - } - if err != nil { - return err - } - if archived && !dstPathSpecified { - dstPath += TarExt - } - pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath)) - if err != nil { - return err - } - _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader) - sds := SDS{ - "type": "file", - "node": node.Id, - "nice": strconv.Itoa(int(nice)), - "src": srcPath, - "dst": dstPath, - "size": strconv.FormatInt(fileSize, 10), - } - if err == nil { - ctx.LogI("tx", sds, "sent") - } else { - sds["err"] = err - ctx.LogE("tx", sds, "sent") - } - return err -} - -func (ctx *Ctx) TxFileChunked( +func (ctx *Ctx) TxFile( node *Node, nice uint8, srcPath, dstPath string, - minSize int64, chunkSize int64, + minSize, maxSize int64, ) error { dstPathSpecified := false if dstPath == "" { @@ -352,6 +313,9 @@ func (ctx *Ctx) TxFileChunked( if err != nil { return err } + if fileSize > maxSize { + return errors.New("Too big than allowed") + } if archived && !dstPathSpecified { dstPath += TarExt } @@ -361,20 +325,19 @@ func (ctx *Ctx) TxFileChunked( if err != nil { return err } - _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader) + _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath) sds := SDS{ "type": "file", "node": node.Id, - "nice": strconv.Itoa(int(nice)), + "nice": int(nice), "src": srcPath, "dst": dstPath, - "size": strconv.FormatInt(fileSize, 10), + "size": fileSize, } if err == nil { ctx.LogI("tx", sds, "sent") } else { - sds["err"] = err - ctx.LogE("tx", sds, "sent") + ctx.LogE("tx", sds, err, "sent") } return err } @@ -417,20 +380,20 @@ func (ctx *Ctx) TxFileChunked( sizeToSend, minSize, io.TeeReader(reader, hsh), + path, ) sds := SDS{ "type": "file", "node": node.Id, - "nice": strconv.Itoa(int(nice)), + "nice": int(nice), "src": srcPath, "dst": path, - "size": strconv.FormatInt(sizeToSend, 10), + "size": sizeToSend, } if err == nil { ctx.LogI("tx", sds, "sent") } else { - sds["err"] = err - ctx.LogE("tx", sds, "sent") + ctx.LogE("tx", sds, err, "sent") return err } hsh.Sum(metaPkt.Checksums[chunkNum][:0]) @@ -451,20 +414,19 @@ func (ctx *Ctx) TxFileChunked( return err } metaPktSize := int64(metaBuf.Len()) - _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf) + _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path) sds := SDS{ "type": "file", "node": node.Id, - "nice": strconv.Itoa(int(nice)), + "nice": int(nice), "src": srcPath, "dst": path, - "size": strconv.FormatInt(metaPktSize, 10), + "size": metaPktSize, } if err == nil { ctx.LogI("tx", sds, "sent") } else { - sds["err"] = err - ctx.LogE("tx", sds, "sent") + ctx.LogE("tx", sds, err, "sent") } return err } @@ -488,20 +450,19 @@ func (ctx *Ctx) TxFreq( } src := strings.NewReader(dstPath) size := int64(src.Len()) - _, err = ctx.Tx(node, pkt, nice, size, minSize, src) + _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath) sds := SDS{ "type": "freq", "node": node.Id, - "nice": strconv.Itoa(int(nice)), - "replynice": strconv.Itoa(int(replyNice)), + "nice": int(nice), + "replynice": int(replyNice), "src": srcPath, "dst": dstPath, } if err == nil { ctx.LogI("tx", sds, "sent") } else { - sds["err"] = err - ctx.LogE("tx", sds, "sent") + ctx.LogE("tx", sds, err, "sent") } return err } @@ -537,20 +498,19 @@ func (ctx *Ctx) TxExec( return err } size := int64(compressed.Len()) - _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed) + _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed, handle) sds := SDS{ "type": "exec", "node": node.Id, - "nice": strconv.Itoa(int(nice)), - "replynice": strconv.Itoa(int(replyNice)), + "nice": int(nice), + "replynice": int(replyNice), "dst": strings.Join(append([]string{handle}, args...), " "), - "size": strconv.FormatInt(size, 10), + "size": size, } if err == nil { ctx.LogI("tx", sds, "sent") } else { - sds["err"] = err - ctx.LogE("tx", sds, "sent") + ctx.LogE("tx", sds, err, "sent") } return err } @@ -559,20 +519,24 @@ func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error sds := SDS{ "type": "trns", "node": node.Id, - "nice": strconv.Itoa(int(nice)), - "size": strconv.FormatInt(size, 10), + "nice": int(nice), + "size": size, } ctx.LogD("tx", sds, "taken") if !ctx.IsEnoughSpace(size) { err := errors.New("is not enough space") - ctx.LogE("tx", sds, err.Error()) + ctx.LogE("tx", sds, err, err.Error()) return err } tmp, err := ctx.NewTmpFileWHash() if err != nil { return err } - if _, err = io.Copy(tmp.W, src); err != nil { + if _, err = CopyProgressed(tmp.W, src, SDS{ + "xx": string(TTx), + "pkt": node.Id.String(), + "fullsize": size, + }, ctx.ShowPrgrs); err != nil { return err } nodePath := filepath.Join(ctx.Spool, node.Id.String()) @@ -580,8 +544,7 @@ func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error if err == nil { ctx.LogI("tx", sds, "sent") } else { - sds["err"] = err - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", SdsAdd(sds, SDS{"err": err}), "sent") } os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) return err