X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Ftx.go;h=f9da81b03093f6c82d500da82d5f492dfa056f33;hb=5d5c951d8fecf27acbe3895091637a70bf7f7f39;hp=cb8153fe0419705b991c621a662396e1238c5e64;hpb=c8b26fe06596d26bdb14c5be85760fb3ddb197b3;p=nncp.git diff --git a/src/tx.go b/src/tx.go index cb8153f..f9da81b 100644 --- a/src/tx.go +++ b/src/tx.go @@ -1,6 +1,6 @@ /* NNCP -- Node to Node copy, utilities for store-and-forward data exchange -Copyright (C) 2016-2019 Sergey Matveev +Copyright (C) 2016-2022 Sergey Matveev This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -18,34 +18,52 @@ along with this program. If not, see . package nncp import ( + "archive/tar" "bufio" "bytes" - "compress/zlib" - "crypto/rand" "errors" - "hash" + "fmt" "io" - "io/ioutil" "os" "path/filepath" "strconv" "strings" + "time" - "github.com/davecgh/go-xdr/xdr2" + xdr "github.com/davecgh/go-xdr/xdr2" + "github.com/dustin/go-humanize" + "github.com/klauspost/compress/zstd" "golang.org/x/crypto/blake2b" - "golang.org/x/crypto/chacha20poly1305" ) +const ( + MaxFileSize = 1 << 62 + + TarBlockSize = 512 + TarExt = ".tar" +) + +type PktEncWriteResult struct { + pktEncRaw []byte + size int64 + err error +} + func (ctx *Ctx) Tx( node *Node, pkt *Pkt, nice uint8, - size, minSize int64, + srcSize, minSize, maxSize int64, src io.Reader, -) (*Node, error) { - tmp, err := ctx.NewTmpFileWHash() - if err != nil { - return nil, err + pktName string, + areaId *AreaId, +) (*Node, int64, error) { + var area *Area + if areaId != nil { + area = ctx.AreaId2Area[*areaId] + if area.Prv == nil { + return nil, 0, errors.New("area has no encryption keys") + } } hops := make([]*Node, 0, 1+len(node.Via)) hops = append(hops, node) @@ -54,287 +72,503 @@ func (ctx *Ctx) Tx( lastNode = ctx.Neigh[*node.Via[i-1]] hops = append(hops, lastNode) } - expectedSize := size - for i := 0; i < len(hops); i++ { - expectedSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+expectedSize) + wrappers := len(hops) + if area != nil { + wrappers++ + } + var expectedSize int64 + if srcSize > 0 { + expectedSize = srcSize + PktOverhead + expectedSize += sizePadCalc(expectedSize, minSize, wrappers) + expectedSize = PktEncOverhead + sizeWithTags(expectedSize) + if maxSize != 0 && expectedSize > maxSize { + return nil, 0, TooBig + } + if !ctx.IsEnoughSpace(expectedSize) { + return nil, 0, errors.New("is not enough space") + } } - padSize := minSize - expectedSize - if padSize < 0 { - padSize = 0 + tmp, err := ctx.NewTmpFileWHash() + if err != nil { + return nil, 0, err } - errs := make(chan error) - curSize := size - pipeR, pipeW := io.Pipe() - go func(size int64, src io.Reader, dst io.WriteCloser) { - ctx.LogD("tx", SDS{ - "node": hops[0].Id, - "nice": strconv.Itoa(int(nice)), - "size": strconv.FormatInt(size, 10), - }, "wrote") - errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst) - dst.Close() - }(curSize, src, pipeW) - curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + padSize + results := make(chan PktEncWriteResult) + pipeR, pipeW := io.Pipe() var pipeRPrev io.Reader + if area == nil { + go func(src io.Reader, dst io.WriteCloser) { + ctx.LogD("tx", LEs{ + {"Node", hops[0].Id}, + {"Nice", int(nice)}, + {"Size", expectedSize}, + }, func(les LEs) string { + return fmt.Sprintf( + "Tx packet to %s (source %s) nice: %s", + ctx.NodeName(hops[0].Id), + humanize.IBytes(uint64(expectedSize)), + NicenessFmt(nice), + ) + }) + pktEncRaw, size, err := PktEncWrite( + ctx.Self, hops[0], pkt, nice, minSize, maxSize, wrappers, src, dst, + ) + results <- PktEncWriteResult{pktEncRaw, size, err} + dst.Close() + }(src, pipeW) + } else { + go func(src io.Reader, dst io.WriteCloser) { + ctx.LogD("tx", LEs{ + {"Area", area.Id}, + {"Nice", int(nice)}, + {"Size", expectedSize}, + }, func(les LEs) string { + return fmt.Sprintf( + "Tx area packet to %s (source %s) nice: %s", + ctx.AreaName(areaId), + humanize.IBytes(uint64(expectedSize)), + NicenessFmt(nice), + ) + }) + areaNode := Node{Id: new(NodeId), ExchPub: new([32]byte)} + copy(areaNode.Id[:], area.Id[:]) + copy(areaNode.ExchPub[:], area.Pub[:]) + pktEncRaw, size, err := PktEncWrite( + ctx.Self, &areaNode, pkt, nice, 0, maxSize, 0, src, dst, + ) + results <- PktEncWriteResult{pktEncRaw, size, err} + dst.Close() + }(src, pipeW) + pipeRPrev = pipeR + pipeR, pipeW = io.Pipe() + go func(src io.Reader, dst io.WriteCloser) { + pktArea, err := NewPkt(PktTypeArea, 0, area.Id[:]) + if err != nil { + panic(err) + } + ctx.LogD("tx", LEs{ + {"Node", hops[0].Id}, + {"Nice", int(nice)}, + {"Size", expectedSize}, + }, func(les LEs) string { + return fmt.Sprintf( + "Tx packet to %s (source %s) nice: %s", + ctx.NodeName(hops[0].Id), + humanize.IBytes(uint64(expectedSize)), + NicenessFmt(nice), + ) + }) + pktEncRaw, size, err := PktEncWrite( + ctx.Self, hops[0], pktArea, nice, minSize, maxSize, wrappers, src, dst, + ) + results <- PktEncWriteResult{pktEncRaw, size, err} + dst.Close() + }(pipeRPrev, pipeW) + } for i := 1; i < len(hops); i++ { - pktTrns, _ := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:]) + pktTrns, err := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:]) + if err != nil { + panic(err) + } pipeRPrev = pipeR pipeR, pipeW = io.Pipe() - go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) { - ctx.LogD("tx", SDS{ - "node": node.Id, - "nice": strconv.Itoa(int(nice)), - "size": strconv.FormatInt(size, 10), - }, "trns wrote") - errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst) + go func(node *Node, pkt *Pkt, src io.Reader, dst io.WriteCloser) { + ctx.LogD("tx", LEs{ + {"Node", node.Id}, + {"Nice", int(nice)}, + }, func(les LEs) string { + return fmt.Sprintf( + "Tx trns packet to %s nice: %s", + ctx.NodeName(node.Id), + NicenessFmt(nice), + ) + }) + pktEncRaw, size, err := PktEncWrite( + ctx.Self, node, pkt, nice, 0, MaxFileSize, 0, src, dst, + ) + results <- PktEncWriteResult{pktEncRaw, size, err} dst.Close() - }(hops[i], pktTrns, curSize, pipeRPrev, pipeW) - curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + }(hops[i], pktTrns, pipeRPrev, pipeW) } go func() { - _, err := io.Copy(tmp.W, pipeR) - errs <- err + _, err := CopyProgressed( + tmp.W, pipeR, "Tx", + LEs{{"Pkt", pktName}, {"FullSize", expectedSize}}, + ctx.ShowPrgrs, + ) + results <- PktEncWriteResult{err: err} }() - for i := 0; i <= len(hops); i++ { - err = <-errs - if err != nil { + var pktEncRaw []byte + var pktEncMsg []byte + var payloadSize int64 + if area != nil { + r := <-results + payloadSize = r.size + pktEncMsg = r.pktEncRaw + wrappers-- + } + for i := 0; i <= wrappers; i++ { + r := <-results + if r.err != nil { tmp.Fd.Close() - return nil, err + return nil, 0, r.err + } + if r.pktEncRaw != nil { + pktEncRaw = r.pktEncRaw + if payloadSize == 0 { + payloadSize = r.size + } } } nodePath := filepath.Join(ctx.Spool, lastNode.Id.String()) err = tmp.Commit(filepath.Join(nodePath, string(TTx))) os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) - return lastNode, err -} - -func prepareTxFile(srcPath string) (io.Reader, *os.File, int64, error) { - var reader io.Reader - var src *os.File - var fileSize int64 - var err error - if srcPath == "-" { - src, err = ioutil.TempFile("", "nncp-file") - if err != nil { - return nil, nil, 0, err - } - os.Remove(src.Name()) - tmpW := bufio.NewWriter(src) - tmpKey := make([]byte, chacha20poly1305.KeySize) - if _, err = rand.Read(tmpKey[:]); err != nil { - return nil, nil, 0, err - } - aead, err := chacha20poly1305.New(tmpKey) - if err != nil { - return nil, nil, 0, err + if err != nil { + return lastNode, 0, err + } + if ctx.HdrUsage { + ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum())) + } + if area != nil { + msgHashRaw := blake2b.Sum256(pktEncMsg) + msgHash := Base32Codec.EncodeToString(msgHashRaw[:]) + seenDir := filepath.Join( + ctx.Spool, ctx.SelfId.String(), AreaDir, areaId.String(), + ) + seenPath := filepath.Join(seenDir, msgHash) + les := LEs{ + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Size", expectedSize}, + {"Area", areaId}, + {"AreaMsg", msgHash}, } - nonce := make([]byte, aead.NonceSize()) - written, err := aeadProcess(aead, nonce, true, bufio.NewReader(os.Stdin), tmpW) - if err != nil { - return nil, nil, 0, err + logMsg := func(les LEs) string { + return fmt.Sprintf( + "Tx area packet to %s (source %s) nice: %s, area %s: %s", + ctx.NodeName(node.Id), + humanize.IBytes(uint64(expectedSize)), + NicenessFmt(nice), + area.Name, + msgHash, + ) } - fileSize = int64(written) - if err = tmpW.Flush(); err != nil { - return nil, nil, 0, err + if err = ensureDir(seenDir); err != nil { + ctx.LogE("tx-mkdir", les, err, logMsg) + return lastNode, 0, err } - src.Seek(0, io.SeekStart) - r, w := io.Pipe() - go func() { - if _, err := aeadProcess(aead, nonce, false, bufio.NewReader(src), w); err != nil { - panic(err) + if fd, err := os.Create(seenPath); err == nil { + fd.Close() + if err = DirSync(seenDir); err != nil { + ctx.LogE("tx-dirsync", les, err, logMsg) + return lastNode, 0, err } - }() - reader = r - } else { - src, err = os.Open(srcPath) - if err != nil { - return nil, nil, 0, err } - srcStat, err := src.Stat() - if err != nil { - return nil, nil, 0, err - } - fileSize = srcStat.Size() - reader = bufio.NewReader(src) + ctx.LogI("tx-area", les, logMsg) } - return reader, src, fileSize, nil + return lastNode, payloadSize, err } -func (ctx *Ctx) TxFile(node *Node, nice uint8, srcPath, dstPath string, minSize int64) error { - if dstPath == "" { - if srcPath == "-" { - return errors.New("Must provide destination filename") - } - dstPath = filepath.Base(srcPath) - } - dstPath = filepath.Clean(dstPath) - if filepath.IsAbs(dstPath) { - return errors.New("Relative destination path required") +type DummyCloser struct{} + +func (dc DummyCloser) Close() error { return nil } + +func prepareTxFile(srcPath string) ( + reader io.Reader, + closer io.Closer, + srcSize int64, + archived bool, + rerr error, +) { + if srcPath == "-" { + reader = os.Stdin + closer = os.Stdin + return } - pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath)) + + srcStat, err := os.Stat(srcPath) if err != nil { - return err + rerr = err + return } - reader, src, fileSize, err := prepareTxFile(srcPath) - if src != nil { - defer src.Close() + mode := srcStat.Mode() + + if mode.IsRegular() { + // It is regular file, just send it + src, err := os.Open(srcPath) + if err != nil { + rerr = err + return + } + reader = src + closer = src + srcSize = srcStat.Size() + return } + + if !mode.IsDir() { + rerr = errors.New("unsupported file type") + return + } + + // It is directory, create PAX archive with its contents + archived = true + basePath := filepath.Base(srcPath) + rootPath, err := filepath.Abs(srcPath) if err != nil { - return err + rerr = err + return } - _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader) - sds := SDS{ - "type": "file", - "node": node.Id, - "nice": strconv.Itoa(int(nice)), - "src": srcPath, - "dst": dstPath, - "size": strconv.FormatInt(fileSize, 10), + type einfo struct { + path string + modTime time.Time + size int64 } - if err == nil { - ctx.LogI("tx", sds, "sent") - } else { - sds["err"] = err - ctx.LogE("tx", sds, "sent") + dirs := make([]einfo, 0, 1<<10) + files := make([]einfo, 0, 1<<10) + rerr = filepath.Walk(rootPath, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.Mode().IsDir() { + // directory header, PAX record header+contents + srcSize += TarBlockSize + 2*TarBlockSize + dirs = append(dirs, einfo{path: path, modTime: info.ModTime()}) + } else if info.Mode().IsRegular() { + // file header, PAX record header+contents, file content + srcSize += TarBlockSize + 2*TarBlockSize + info.Size() + if n := info.Size() % TarBlockSize; n != 0 { + srcSize += TarBlockSize - n // padding + } + files = append(files, einfo{ + path: path, + modTime: info.ModTime(), + size: info.Size(), + }) + } + return nil + }) + if rerr != nil { + return } - return err + + r, w := io.Pipe() + reader = r + closer = DummyCloser{} + srcSize += 2 * TarBlockSize // termination block + + go func() error { + tarWr := tar.NewWriter(w) + hdr := tar.Header{ + Typeflag: tar.TypeDir, + Mode: 0777, + PAXRecords: map[string]string{ + "comment": "Autogenerated by " + VersionGet(), + }, + Format: tar.FormatPAX, + } + for _, e := range dirs { + hdr.Name = basePath + e.path[len(rootPath):] + hdr.ModTime = e.modTime + if err = tarWr.WriteHeader(&hdr); err != nil { + return w.CloseWithError(err) + } + } + hdr.Typeflag = tar.TypeReg + hdr.Mode = 0666 + for _, e := range files { + hdr.Name = basePath + e.path[len(rootPath):] + hdr.ModTime = e.modTime + hdr.Size = e.size + if err = tarWr.WriteHeader(&hdr); err != nil { + return w.CloseWithError(err) + } + fd, err := os.Open(e.path) + if err != nil { + fd.Close() + return w.CloseWithError(err) + } + if _, err = io.Copy(tarWr, bufio.NewReader(fd)); err != nil { + fd.Close() + return w.CloseWithError(err) + } + fd.Close() + } + if err = tarWr.Close(); err != nil { + return w.CloseWithError(err) + } + return w.Close() + }() + return } -func (ctx *Ctx) TxFileChunked( +func (ctx *Ctx) TxFile( node *Node, nice uint8, srcPath, dstPath string, - minSize int64, - chunkSize int64, + chunkSize, minSize, maxSize int64, + areaId *AreaId, ) error { + dstPathSpecified := false if dstPath == "" { if srcPath == "-" { return errors.New("Must provide destination filename") } dstPath = filepath.Base(srcPath) + } else { + dstPathSpecified = true } dstPath = filepath.Clean(dstPath) if filepath.IsAbs(dstPath) { return errors.New("Relative destination path required") } - reader, src, fileSize, err := prepareTxFile(srcPath) - if src != nil { - defer src.Close() + reader, closer, srcSize, archived, err := prepareTxFile(srcPath) + if closer != nil { + defer closer.Close() } if err != nil { return err } + if archived && !dstPathSpecified { + dstPath += TarExt + } - if fileSize <= chunkSize { + if chunkSize == 0 || (srcSize > 0 && srcSize <= chunkSize) { pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath)) if err != nil { return err } - _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader) - sds := SDS{ - "type": "file", - "node": node.Id, - "nice": strconv.Itoa(int(nice)), - "src": srcPath, - "dst": dstPath, - "size": strconv.FormatInt(fileSize, 10), + _, finalSize, err := ctx.Tx( + node, pkt, nice, + srcSize, minSize, maxSize, + bufio.NewReader(reader), dstPath, areaId, + ) + les := LEs{ + {"Type", "file"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Src", srcPath}, + {"Dst", dstPath}, + {"Size", finalSize}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "File %s (%s) is sent to %s:%s", + srcPath, + humanize.IBytes(uint64(finalSize)), + ctx.NodeName(node.Id), + dstPath, + ) } if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, logMsg) } else { - sds["err"] = err - ctx.LogE("tx", sds, "sent") + ctx.LogE("tx", les, err, logMsg) } return err } - leftSize := fileSize - metaPkt := ChunkedMeta{ - Magic: MagicNNCPMv1, - FileSize: uint64(fileSize), - ChunkSize: uint64(chunkSize), - Checksums: make([][32]byte, 0, (fileSize/chunkSize)+1), - } - for i := int64(0); i < (fileSize/chunkSize)+1; i++ { - hsh := new([32]byte) - metaPkt.Checksums = append(metaPkt.Checksums, *hsh) - } - var sizeToSend int64 - var hsh hash.Hash - var pkt *Pkt + br := bufio.NewReader(reader) + var sizeFull int64 var chunkNum int - var path string + checksums := [][MTHSize]byte{} for { - if leftSize <= chunkSize { - sizeToSend = leftSize - } else { - sizeToSend = chunkSize - } - path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum) - pkt, err = NewPkt(PktTypeFile, nice, []byte(path)) - if err != nil { - return err - } - hsh, err = blake2b.New256(nil) + lr := io.LimitReader(br, chunkSize) + path := dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum) + pkt, err := NewPkt(PktTypeFile, nice, []byte(path)) if err != nil { return err } - _, err = ctx.Tx( - node, - pkt, - nice, - sizeToSend, - minSize, - io.TeeReader(reader, hsh), + hsh := MTHNew(0, 0) + _, size, err := ctx.Tx( + node, pkt, nice, + 0, minSize, maxSize, + io.TeeReader(lr, hsh), + path, areaId, ) - sds := SDS{ - "type": "file", - "node": node.Id, - "nice": strconv.Itoa(int(nice)), - "src": srcPath, - "dst": path, - "size": strconv.FormatInt(sizeToSend, 10), + + les := LEs{ + {"Type", "file"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Src", srcPath}, + {"Dst", path}, + {"Size", size}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "File %s (%s) is sent to %s:%s", + srcPath, + humanize.IBytes(uint64(size)), + ctx.NodeName(node.Id), + path, + ) } if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, logMsg) } else { - sds["err"] = err - ctx.LogE("tx", sds, "sent") + ctx.LogE("tx", les, err, logMsg) return err } - hsh.Sum(metaPkt.Checksums[chunkNum][:0]) - leftSize -= sizeToSend + + sizeFull += size - PktOverhead + var checksum [MTHSize]byte + hsh.Sum(checksum[:0]) + checksums = append(checksums, checksum) chunkNum++ - if leftSize == 0 { + if size < chunkSize { + break + } + if _, err = br.Peek(1); err != nil { break } } - var metaBuf bytes.Buffer - _, err = xdr.Marshal(&metaBuf, metaPkt) + + metaPkt := ChunkedMeta{ + Magic: MagicNNCPMv2.B, + FileSize: uint64(sizeFull), + ChunkSize: uint64(chunkSize), + Checksums: checksums, + } + var buf bytes.Buffer + _, err = xdr.Marshal(&buf, metaPkt) if err != nil { return err } - path = dstPath + ChunkedSuffixMeta - pkt, err = NewPkt(PktTypeFile, nice, []byte(path)) + path := dstPath + ChunkedSuffixMeta + pkt, err := NewPkt(PktTypeFile, nice, []byte(path)) if err != nil { return err } - metaPktSize := int64(metaBuf.Len()) - _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf) - sds := SDS{ - "type": "file", - "node": node.Id, - "nice": strconv.Itoa(int(nice)), - "src": srcPath, - "dst": path, - "size": strconv.FormatInt(metaPktSize, 10), + metaPktSize := int64(buf.Len()) + _, _, err = ctx.Tx( + node, + pkt, + nice, + metaPktSize, minSize, maxSize, + &buf, path, areaId, + ) + les := LEs{ + {"Type", "file"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Src", srcPath}, + {"Dst", path}, + {"Size", metaPktSize}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "File %s (%s) is sent to %s:%s", + srcPath, + humanize.IBytes(uint64(metaPktSize)), + ctx.NodeName(node.Id), + path, + ) } if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, logMsg) } else { - sds["err"] = err - ctx.LogE("tx", sds, "sent") + ctx.LogE("tx", les, err, logMsg) } return err } @@ -343,7 +577,8 @@ func (ctx *Ctx) TxFreq( node *Node, nice, replyNice uint8, srcPath, dstPath string, - minSize int64) error { + minSize int64, +) error { dstPath = filepath.Clean(dstPath) if filepath.IsAbs(dstPath) { return errors.New("Relative destination path required") @@ -358,20 +593,26 @@ func (ctx *Ctx) TxFreq( } src := strings.NewReader(dstPath) size := int64(src.Len()) - _, err = ctx.Tx(node, pkt, nice, size, minSize, src) - sds := SDS{ - "type": "freq", - "node": node.Id, - "nice": strconv.Itoa(int(nice)), - "replynice": strconv.Itoa(int(replyNice)), - "src": srcPath, - "dst": dstPath, + _, _, err = ctx.Tx(node, pkt, nice, size, minSize, MaxFileSize, src, srcPath, nil) + les := LEs{ + {"Type", "freq"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"ReplyNice", int(replyNice)}, + {"Src", srcPath}, + {"Dst", dstPath}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "File request from %s:%s to %s is sent", + ctx.NodeName(node.Id), srcPath, + dstPath, + ) } if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, logMsg) } else { - sds["err"] = err - ctx.LogE("tx", sds, "sent") + ctx.LogE("tx", les, err, logMsg) } return err } @@ -381,68 +622,109 @@ func (ctx *Ctx) TxExec( nice, replyNice uint8, handle string, args []string, - body []byte, - minSize int64, + in io.Reader, + minSize int64, maxSize int64, + noCompress bool, + areaId *AreaId, ) error { path := make([][]byte, 0, 1+len(args)) path = append(path, []byte(handle)) for _, arg := range args { path = append(path, []byte(arg)) } - pkt, err := NewPkt(PktTypeExec, replyNice, bytes.Join(path, []byte{0})) - if err != nil { - return err + pktType := PktTypeExec + if noCompress { + pktType = PktTypeExecFat } - var compressed bytes.Buffer - compressor, err := zlib.NewWriterLevel(&compressed, zlib.BestCompression) + pkt, err := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0})) if err != nil { return err } - if _, err = io.Copy(compressor, bytes.NewReader(body)); err != nil { - return err + compressErr := make(chan error, 1) + if !noCompress { + pr, pw := io.Pipe() + compressor, err := zstd.NewWriter(pw, zstd.WithEncoderLevel(zstd.SpeedDefault)) + if err != nil { + return err + } + go func(r io.Reader) { + if _, err := io.Copy(compressor, r); err != nil { + compressErr <- err + return + } + compressErr <- compressor.Close() + pw.Close() + }(in) + in = pr + } + _, size, err := ctx.Tx(node, pkt, nice, 0, minSize, maxSize, in, handle, areaId) + if !noCompress { + e := <-compressErr + if err == nil { + err = e + } } - compressor.Close() - size := int64(compressed.Len()) - _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed) - sds := SDS{ - "type": "exec", - "node": node.Id, - "nice": strconv.Itoa(int(nice)), - "replynice": strconv.Itoa(int(replyNice)), - "dst": strings.Join(append([]string{handle}, args...), " "), - "size": strconv.FormatInt(size, 10), + dst := strings.Join(append([]string{handle}, args...), " ") + les := LEs{ + {"Type", "exec"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"ReplyNice", int(replyNice)}, + {"Dst", dst}, + {"Size", size}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "Exec is sent to %s@%s (%s)", + ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)), + ) } if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, logMsg) } else { - sds["err"] = err - ctx.LogE("tx", sds, "sent") + ctx.LogE("tx", les, err, logMsg) } return err } func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error { - sds := SDS{ - "type": "trns", - "node": node.Id, - "nice": strconv.Itoa(int(nice)), - "size": strconv.FormatInt(size, 10), + les := LEs{ + {"Type", "trns"}, + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Size", size}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "Transitional packet to %s (%s) (nice %s)", + ctx.NodeName(node.Id), + humanize.IBytes(uint64(size)), + NicenessFmt(nice), + ) + } + ctx.LogD("tx", les, logMsg) + if !ctx.IsEnoughSpace(size) { + err := errors.New("is not enough space") + ctx.LogE("tx", les, err, logMsg) + return err } - ctx.LogD("tx", sds, "taken") tmp, err := ctx.NewTmpFileWHash() if err != nil { return err } - if _, err = io.Copy(tmp.W, src); err != nil { + if _, err = CopyProgressed( + tmp.W, src, "Tx trns", + LEs{{"Pkt", node.Id.String()}, {"FullSize", size}}, + ctx.ShowPrgrs, + ); err != nil { return err } nodePath := filepath.Join(ctx.Spool, node.Id.String()) err = tmp.Commit(filepath.Join(nodePath, string(TTx))) if err == nil { - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", les, logMsg) } else { - sds["err"] = err - ctx.LogI("tx", sds, "sent") + ctx.LogI("tx", append(les, LE{"Err", err}), logMsg) } os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) return err