X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Ftx.go;h=67bb659a672426895919fcd5961633c75d455df9;hb=1d2ce674b042d07fd9b37a46578c8b62bb0345b7;hp=a509207422ccee4479b514b77c70736acc86e018;hpb=9c25694667c14c934fc122741b9736988172f7de;p=nncp.git diff --git a/src/tx.go b/src/tx.go index a509207..67bb659 100644 --- a/src/tx.go +++ b/src/tx.go @@ -18,6 +18,7 @@ along with this program. If not, see . package nncp import ( + "archive/tar" "bufio" "bytes" "crypto/rand" @@ -29,24 +30,29 @@ import ( "path/filepath" "strconv" "strings" + "time" - "github.com/davecgh/go-xdr/xdr2" + xdr "github.com/davecgh/go-xdr/xdr2" "github.com/klauspost/compress/zstd" "golang.org/x/crypto/blake2b" "golang.org/x/crypto/chacha20poly1305" ) +const ( + MaxFileSize = 1 << 62 + + TarBlockSize = 512 + TarExt = ".tar" +) + func (ctx *Ctx) Tx( node *Node, pkt *Pkt, nice uint8, size, minSize int64, src io.Reader, + pktName string, ) (*Node, error) { - tmp, err := ctx.NewTmpFileWHash() - if err != nil { - return nil, err - } hops := make([]*Node, 0, 1+len(node.Via)) hops = append(hops, node) lastNode := node @@ -62,14 +68,22 @@ func (ctx *Ctx) Tx( if padSize < 0 { padSize = 0 } + if !ctx.IsEnoughSpace(size + padSize) { + return nil, errors.New("is not enough space") + } + tmp, err := ctx.NewTmpFileWHash() + if err != nil { + return nil, err + } + errs := make(chan error) curSize := size pipeR, pipeW := io.Pipe() go func(size int64, src io.Reader, dst io.WriteCloser) { ctx.LogD("tx", SDS{ "node": hops[0].Id, - "nice": strconv.Itoa(int(nice)), - "size": strconv.FormatInt(size, 10), + "nice": int(nice), + "size": size, }, "wrote") errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst) dst.Close() @@ -84,8 +98,8 @@ func (ctx *Ctx) Tx( go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) { ctx.LogD("tx", SDS{ "node": node.Id, - "nice": strconv.Itoa(int(nice)), - "size": strconv.FormatInt(size, 10), + "nice": int(nice), + "size": size, }, "trns wrote") errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst) dst.Close() @@ -93,7 +107,11 @@ func (ctx *Ctx) Tx( curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) } go func() { - _, err := io.Copy(tmp.W, pipeR) + _, err := CopyProgressed( + tmp.W, pipeR, + SDS{"xx": string(TTx), "pkt": pktName, "fullsize": curSize}, + ctx.ShowPrgrs, + ) errs <- err }() for i := 0; i <= len(hops); i++ { @@ -109,142 +127,217 @@ func (ctx *Ctx) Tx( return lastNode, err } -func prepareTxFile(srcPath string) (io.Reader, *os.File, int64, error) { - var reader io.Reader - var src *os.File - var fileSize int64 - var err error +type DummyCloser struct{} + +func (dc DummyCloser) Close() error { return nil } + +func prepareTxFile(srcPath string) (reader io.Reader, closer io.Closer, fileSize int64, archived bool, rerr error) { if srcPath == "-" { - src, err = ioutil.TempFile("", "nncp-file") + // Read content from stdin, saving to temporary file, encrypting + // on the fly + src, err := ioutil.TempFile("", "nncp-file") if err != nil { - return nil, nil, 0, err + rerr = err + return } os.Remove(src.Name()) tmpW := bufio.NewWriter(src) tmpKey := make([]byte, chacha20poly1305.KeySize) - if _, err = rand.Read(tmpKey[:]); err != nil { - return nil, nil, 0, err + if _, rerr = rand.Read(tmpKey[:]); rerr != nil { + return } aead, err := chacha20poly1305.New(tmpKey) if err != nil { - return nil, nil, 0, err + rerr = err + return } nonce := make([]byte, aead.NonceSize()) written, err := aeadProcess(aead, nonce, true, bufio.NewReader(os.Stdin), tmpW) if err != nil { - return nil, nil, 0, err + rerr = err + return } fileSize = int64(written) if err = tmpW.Flush(); err != nil { - return nil, nil, 0, err + return } src.Seek(0, io.SeekStart) r, w := io.Pipe() go func() { if _, err := aeadProcess(aead, nonce, false, bufio.NewReader(src), w); err != nil { - panic(err) + w.CloseWithError(err) } }() reader = r - } else { - src, err = os.Open(srcPath) - if err != nil { - return nil, nil, 0, err - } - srcStat, err := src.Stat() + closer = src + return + } + + srcStat, err := os.Stat(srcPath) + if err != nil { + rerr = err + return + } + mode := srcStat.Mode() + + if mode.IsRegular() { + // It is regular file, just send it + src, err := os.Open(srcPath) if err != nil { - return nil, nil, 0, err + rerr = err + return } fileSize = srcStat.Size() reader = bufio.NewReader(src) + closer = src + return } - return reader, src, fileSize, nil -} -func (ctx *Ctx) TxFile(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error { - if dstPath == "" { - if srcPath == "-" { - return errors.New("Must provide destination filename") - } - dstPath = filepath.Base(srcPath) - } - dstPath = filepath.Clean(dstPath) - if filepath.IsAbs(dstPath) { - return errors.New("Relative destination path required") - } - pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath)) - if err != nil { - return err - } - reader, src, fileSize, err := prepareTxFile(srcPath) - if src != nil { - defer src.Close() + if !mode.IsDir() { + rerr = errors.New("unsupported file type") + return } + + // It is directory, create PAX archive with its contents + archived = true + basePath := filepath.Base(srcPath) + rootPath, err := filepath.Abs(srcPath) if err != nil { - return err + rerr = err + return } - _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader) - sds := SDS{ - "type": "file", - "node": node.Id, - "nice": strconv.Itoa(int(nice)), - "src": srcPath, - "dst": dstPath, - "size": strconv.FormatInt(fileSize, 10), + type einfo struct { + path string + modTime time.Time + size int64 } - if err == nil { - ctx.LogI("tx", sds, "sent") - } else { - sds["err"] = err - ctx.LogE("tx", sds, "sent") + dirs := make([]einfo, 0, 1<<10) + files := make([]einfo, 0, 1<<10) + rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + // directory header, PAX record header+contents + fileSize += TarBlockSize + 2*TarBlockSize + dirs = append(dirs, einfo{path: path, modTime: info.ModTime()}) + } else { + // file header, PAX record header+contents, file content + fileSize += TarBlockSize + 2*TarBlockSize + info.Size() + if n := info.Size() % TarBlockSize; n != 0 { + fileSize += TarBlockSize - n // padding + } + files = append(files, einfo{ + path: path, + modTime: info.ModTime(), + size: info.Size(), + }) + } + return nil + }) + if rerr != nil { + return } - return err + + r, w := io.Pipe() + reader = r + closer = DummyCloser{} + fileSize += 2 * TarBlockSize // termination block + + go func() { + tarWr := tar.NewWriter(w) + hdr := tar.Header{ + Typeflag: tar.TypeDir, + Mode: 0777, + PAXRecords: map[string]string{ + "comment": "Autogenerated by " + VersionGet(), + }, + Format: tar.FormatPAX, + } + for _, e := range dirs { + hdr.Name = basePath + e.path[len(rootPath):] + hdr.ModTime = e.modTime + if err = tarWr.WriteHeader(&hdr); err != nil { + w.CloseWithError(err) + } + } + hdr.Typeflag = tar.TypeReg + hdr.Mode = 0666 + for _, e := range files { + hdr.Name = basePath + e.path[len(rootPath):] + hdr.ModTime = e.modTime + hdr.Size = e.size + if err = tarWr.WriteHeader(&hdr); err != nil { + w.CloseWithError(err) + } + fd, err := os.Open(e.path) + if err != nil { + w.CloseWithError(err) + } + _, err = io.Copy(tarWr, bufio.NewReader(fd)) + if err != nil { + w.CloseWithError(err) + } + fd.Close() + } + tarWr.Close() + w.Close() + }() + return } -func (ctx *Ctx) TxFileChunked( +func (ctx *Ctx) TxFile( node *Node, nice uint8, srcPath, dstPath string, - minSize int64, chunkSize int64, + minSize, maxSize int64, ) error { + dstPathSpecified := false if dstPath == "" { if srcPath == "-" { return errors.New("Must provide destination filename") } dstPath = filepath.Base(srcPath) + } else { + dstPathSpecified = true } dstPath = filepath.Clean(dstPath) if filepath.IsAbs(dstPath) { return errors.New("Relative destination path required") } - reader, src, fileSize, err := prepareTxFile(srcPath) - if src != nil { - defer src.Close() + reader, closer, fileSize, archived, err := prepareTxFile(srcPath) + if closer != nil { + defer closer.Close() } if err != nil { return err } + if fileSize > maxSize { + return errors.New("Too big than allowed") + } + if archived && !dstPathSpecified { + dstPath += TarExt + } if fileSize <= chunkSize { pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath)) if err != nil { return err } - _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader) + _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath) sds := SDS{ "type": "file", "node": node.Id, - "nice": strconv.Itoa(int(nice)), + "nice": int(nice), "src": srcPath, "dst": dstPath, - "size": strconv.FormatInt(fileSize, 10), + "size": fileSize, } if err == nil { ctx.LogI("tx", sds, "sent") } else { - sds["err"] = err - ctx.LogE("tx", sds, "sent") + ctx.LogE("tx", sds, err, "sent") } return err } @@ -287,20 +380,20 @@ func (ctx *Ctx) TxFileChunked( sizeToSend, minSize, io.TeeReader(reader, hsh), + path, ) sds := SDS{ "type": "file", "node": node.Id, - "nice": strconv.Itoa(int(nice)), + "nice": int(nice), "src": srcPath, "dst": path, - "size": strconv.FormatInt(sizeToSend, 10), + "size": sizeToSend, } if err == nil { ctx.LogI("tx", sds, "sent") } else { - sds["err"] = err - ctx.LogE("tx", sds, "sent") + ctx.LogE("tx", sds, err, "sent") return err } hsh.Sum(metaPkt.Checksums[chunkNum][:0]) @@ -321,20 +414,19 @@ func (ctx *Ctx) TxFileChunked( return err } metaPktSize := int64(metaBuf.Len()) - _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf) + _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path) sds := SDS{ "type": "file", "node": node.Id, - "nice": strconv.Itoa(int(nice)), + "nice": int(nice), "src": srcPath, "dst": path, - "size": strconv.FormatInt(metaPktSize, 10), + "size": metaPktSize, } if err == nil { ctx.LogI("tx", sds, "sent") } else { - sds["err"] = err - ctx.LogE("tx", sds, "sent") + ctx.LogE("tx", sds, err, "sent") } return err } @@ -358,20 +450,19 @@ func (ctx *Ctx) TxFreq( } src := strings.NewReader(dstPath) size := int64(src.Len()) - _, err = ctx.Tx(node, pkt, nice, size, minSize, src) + _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath) sds := SDS{ "type": "freq", "node": node.Id, - "nice": strconv.Itoa(int(nice)), - "replynice": strconv.Itoa(int(replyNice)), + "nice": int(nice), + "replynice": int(replyNice), "src": srcPath, "dst": dstPath, } if err == nil { ctx.LogI("tx", sds, "sent") } else { - sds["err"] = err - ctx.LogE("tx", sds, "sent") + ctx.LogE("tx", sds, err, "sent") } return err } @@ -407,20 +498,19 @@ func (ctx *Ctx) TxExec( return err } size := int64(compressed.Len()) - _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed) + _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed, handle) sds := SDS{ "type": "exec", "node": node.Id, - "nice": strconv.Itoa(int(nice)), - "replynice": strconv.Itoa(int(replyNice)), + "nice": int(nice), + "replynice": int(replyNice), "dst": strings.Join(append([]string{handle}, args...), " "), - "size": strconv.FormatInt(size, 10), + "size": size, } if err == nil { ctx.LogI("tx", sds, "sent") } else { - sds["err"] = err - ctx.LogE("tx", sds, "sent") + ctx.LogE("tx", sds, err, "sent") } return err } @@ -429,15 +519,24 @@ func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error sds := SDS{ "type": "trns", "node": node.Id, - "nice": strconv.Itoa(int(nice)), - "size": strconv.FormatInt(size, 10), + "nice": int(nice), + "size": size, } ctx.LogD("tx", sds, "taken") + if !ctx.IsEnoughSpace(size) { + err := errors.New("is not enough space") + ctx.LogE("tx", sds, err, err.Error()) + return err + } tmp, err := ctx.NewTmpFileWHash() if err != nil { return err } - if _, err = io.Copy(tmp.W, src); err != nil { + if _, err = CopyProgressed(tmp.W, src, SDS{ + "xx": string(TTx), + "pkt": node.Id.String(), + "fullsize": size, + }, ctx.ShowPrgrs); err != nil { return err } nodePath := filepath.Join(ctx.Spool, node.Id.String()) @@ -445,8 +544,7 @@ func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error if err == nil { ctx.LogI("tx", sds, "sent") } else { - sds["err"] = err - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", SdsAdd(sds, SDS{"err": err}), "sent") } os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) return err