X-Git-Url: http://www.git.cypherpunks.ru/?p=nncp.git;a=blobdiff_plain;f=src%2Ftx.go;h=0f7d2b8d9369627d4cbf52eb63b996b5874a22a5;hp=e096ed65f873ddeb48f9ac635a01bbeea8849ba2;hb=b47dbfe6687569650fa544a4ecf3e4ea388390cb;hpb=99c6aa8d7e77b37c58d9dc0c8f3f8ed6a94352f5 diff --git a/src/tx.go b/src/tx.go index e096ed6..0f7d2b8 100644 --- a/src/tx.go +++ b/src/tx.go @@ -21,12 +21,9 @@ import ( "archive/tar" "bufio" "bytes" - "crypto/rand" "errors" "fmt" - "hash" "io" - "io/ioutil" "os" "path/filepath" "strconv" @@ -37,7 +34,6 @@ import ( "github.com/dustin/go-humanize" "github.com/klauspost/compress/zstd" "golang.org/x/crypto/blake2b" - "golang.org/x/crypto/chacha20poly1305" ) const ( @@ -47,20 +43,26 @@ const ( TarExt = ".tar" ) +type PktEncWriteResult struct { + pktEncRaw []byte + size int64 + err error +} + func (ctx *Ctx) Tx( node *Node, pkt *Pkt, nice uint8, - size, minSize int64, + srcSize, minSize, maxSize int64, src io.Reader, pktName string, areaId *AreaId, -) (*Node, error) { +) (*Node, int64, error) { var area *Area if areaId != nil { area = ctx.AreaId2Area[*areaId] if area.Prv == nil { - return nil, errors.New("area has no encryption keys") + return nil, 0, errors.New("area has no encryption keys") } } hops := make([]*Node, 0, 1+len(node.Via)) @@ -70,85 +72,76 @@ func (ctx *Ctx) Tx( lastNode = ctx.Neigh[*node.Via[i-1]] hops = append(hops, lastNode) } - expectedSize := size wrappers := len(hops) if area != nil { wrappers++ } - for i := 0; i < wrappers; i++ { - expectedSize = PktEncOverhead + - PktSizeOverhead + - sizeWithTags(PktOverhead+expectedSize) - } - padSize := minSize - expectedSize - if padSize < 0 { - padSize = 0 - } - if !ctx.IsEnoughSpace(size + padSize) { - return nil, errors.New("is not enough space") + var expectedSize int64 + if srcSize > 0 { + expectedSize = srcSize + PktOverhead + expectedSize += sizePadCalc(expectedSize, minSize, wrappers) + expectedSize = PktEncOverhead + sizeWithTags(expectedSize) + if maxSize != 0 && expectedSize > maxSize { + return nil, 0, TooBig + } + if !ctx.IsEnoughSpace(expectedSize) { + return nil, 0, errors.New("is not enough space") + } } tmp, err := ctx.NewTmpFileWHash() if err != nil { - return nil, err + return nil, 0, err } - errs := make(chan error) - pktEncRaws := make(chan []byte) - curSize := size + results := make(chan PktEncWriteResult) pipeR, pipeW := io.Pipe() var pipeRPrev io.Reader if area == nil { - go func(size int64, src io.Reader, dst io.WriteCloser) { + go func(src io.Reader, dst io.WriteCloser) { ctx.LogD("tx", LEs{ {"Node", hops[0].Id}, {"Nice", int(nice)}, - {"Size", size}, + {"Size", expectedSize}, }, func(les LEs) string { return fmt.Sprintf( - "Tx packet to %s (%s) nice: %s", + "Tx packet to %s (source %s) nice: %s", ctx.NodeName(hops[0].Id), - humanize.IBytes(uint64(size)), + humanize.IBytes(uint64(expectedSize)), NicenessFmt(nice), ) }) - pktEncRaw, err := PktEncWrite( - ctx.Self, hops[0], pkt, nice, size, padSize, src, dst, + pktEncRaw, size, err := PktEncWrite( + ctx.Self, hops[0], pkt, nice, minSize, maxSize, wrappers, src, dst, ) - pktEncRaws <- pktEncRaw - errs <- err + results <- PktEncWriteResult{pktEncRaw, size, err} dst.Close() - }(curSize, src, pipeW) - curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) - curSize += padSize + }(src, pipeW) } else { - go func(size, padSize int64, src io.Reader, dst io.WriteCloser) { + go func(src io.Reader, dst io.WriteCloser) { ctx.LogD("tx", LEs{ {"Area", area.Id}, {"Nice", int(nice)}, - {"Size", size}, + {"Size", expectedSize}, }, func(les LEs) string { return fmt.Sprintf( - "Tx area packet to %s (%s) nice: %s", + "Tx area packet to %s (source %s) nice: %s", ctx.AreaName(areaId), - humanize.IBytes(uint64(size)), + humanize.IBytes(uint64(expectedSize)), NicenessFmt(nice), ) }) areaNode := Node{Id: new(NodeId), ExchPub: new([32]byte)} copy(areaNode.Id[:], area.Id[:]) copy(areaNode.ExchPub[:], area.Pub[:]) - pktEncRaw, err := PktEncWrite( - ctx.Self, &areaNode, pkt, nice, size, padSize, src, dst, + pktEncRaw, size, err := PktEncWrite( + ctx.Self, &areaNode, pkt, nice, 0, maxSize, 0, src, dst, ) - pktEncRaws <- pktEncRaw - errs <- err + results <- PktEncWriteResult{pktEncRaw, size, err} dst.Close() - }(curSize, padSize, src, pipeW) - curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) - curSize += padSize + }(src, pipeW) pipeRPrev = pipeR pipeR, pipeW = io.Pipe() - go func(size int64, src io.Reader, dst io.WriteCloser) { + go func(src io.Reader, dst io.WriteCloser) { pktArea, err := NewPkt(PktTypeArea, 0, area.Id[:]) if err != nil { panic(err) @@ -156,23 +149,21 @@ func (ctx *Ctx) Tx( ctx.LogD("tx", LEs{ {"Node", hops[0].Id}, {"Nice", int(nice)}, - {"Size", size}, + {"Size", expectedSize}, }, func(les LEs) string { return fmt.Sprintf( - "Tx packet to %s (%s) nice: %s", + "Tx packet to %s (source %s) nice: %s", ctx.NodeName(hops[0].Id), - humanize.IBytes(uint64(size)), + humanize.IBytes(uint64(expectedSize)), NicenessFmt(nice), ) }) - pktEncRaw, err := PktEncWrite( - ctx.Self, hops[0], pktArea, nice, size, 0, src, dst, + pktEncRaw, size, err := PktEncWrite( + ctx.Self, hops[0], pktArea, nice, minSize, maxSize, wrappers, src, dst, ) - pktEncRaws <- pktEncRaw - errs <- err + results <- PktEncWriteResult{pktEncRaw, size, err} dst.Close() - }(curSize, pipeRPrev, pipeW) - curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + }(pipeRPrev, pipeW) } for i := 1; i < len(hops); i++ { pktTrns, err := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:]) @@ -181,54 +172,54 @@ func (ctx *Ctx) Tx( } pipeRPrev = pipeR pipeR, pipeW = io.Pipe() - go func(node *Node, pkt *Pkt, size int64, src io.Reader, dst io.WriteCloser) { + go func(node *Node, pkt *Pkt, src io.Reader, dst io.WriteCloser) { ctx.LogD("tx", LEs{ {"Node", node.Id}, {"Nice", int(nice)}, - {"Size", size}, }, func(les LEs) string { return fmt.Sprintf( - "Tx trns packet to %s (%s) nice: %s", + "Tx trns packet to %s nice: %s", ctx.NodeName(node.Id), - humanize.IBytes(uint64(size)), NicenessFmt(nice), ) }) - pktEncRaw, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst) - pktEncRaws <- pktEncRaw - errs <- err + pktEncRaw, size, err := PktEncWrite( + ctx.Self, node, pkt, nice, 0, MaxFileSize, 0, src, dst, + ) + results <- PktEncWriteResult{pktEncRaw, size, err} dst.Close() - }(hops[i], pktTrns, curSize, pipeRPrev, pipeW) - curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + }(hops[i], pktTrns, pipeRPrev, pipeW) } go func() { _, err := CopyProgressed( tmp.W, pipeR, "Tx", - LEs{{"Pkt", pktName}, {"FullSize", curSize}}, + LEs{{"Pkt", pktName}, {"FullSize", expectedSize}}, ctx.ShowPrgrs, ) - errs <- err + results <- PktEncWriteResult{err: err} }() var pktEncRaw []byte var pktEncMsg []byte if area != nil { - pktEncMsg = <-pktEncRaws - } - for i := 0; i < len(hops); i++ { - pktEncRaw = <-pktEncRaws + pktEncMsg = (<-results).pktEncRaw } + var finalSize int64 for i := 0; i <= wrappers; i++ { - err = <-errs - if err != nil { + r := <-results + if r.err != nil { tmp.Fd.Close() - return nil, err + return nil, 0, err + } + if r.pktEncRaw != nil { + finalSize = r.size + pktEncRaw = r.pktEncRaw } } nodePath := filepath.Join(ctx.Spool, lastNode.Id.String()) err = tmp.Commit(filepath.Join(nodePath, string(TTx))) os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) if err != nil { - return lastNode, err + return lastNode, 0, err } if ctx.HdrUsage { ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum())) @@ -243,15 +234,15 @@ func (ctx *Ctx) Tx( les := LEs{ {"Node", node.Id}, {"Nice", int(nice)}, - {"Size", size}, + {"Size", expectedSize}, {"Area", areaId}, {"AreaMsg", msgHash}, } logMsg := func(les LEs) string { return fmt.Sprintf( - "Tx area packet to %s (%s) nice: %s, area %s: %s", + "Tx area packet to %s (source %s) nice: %s, area %s: %s", ctx.NodeName(node.Id), - humanize.IBytes(uint64(size)), + humanize.IBytes(uint64(expectedSize)), NicenessFmt(nice), area.Name, msgHash, @@ -259,84 +250,34 @@ func (ctx *Ctx) Tx( } if err = ensureDir(seenDir); err != nil { ctx.LogE("tx-mkdir", les, err, logMsg) - return lastNode, err + return lastNode, 0, err } if fd, err := os.Create(seenPath); err == nil { fd.Close() if err = DirSync(seenDir); err != nil { ctx.LogE("tx-dirsync", les, err, logMsg) - return lastNode, err + return lastNode, 0, err } } ctx.LogI("tx-area", les, logMsg) } - return lastNode, err + return lastNode, finalSize, err } type DummyCloser struct{} func (dc DummyCloser) Close() error { return nil } -func throughTmpFile(r io.Reader) ( - reader io.Reader, - closer io.Closer, - fileSize int64, - rerr error, -) { - src, err := ioutil.TempFile("", "nncp-file") - if err != nil { - rerr = err - return - } - os.Remove(src.Name()) - tmpW := bufio.NewWriter(src) - tmpKey := make([]byte, chacha20poly1305.KeySize) - if _, rerr = rand.Read(tmpKey[:]); rerr != nil { - return - } - aead, err := chacha20poly1305.New(tmpKey) - if err != nil { - rerr = err - return - } - nonce := make([]byte, aead.NonceSize()) - written, err := aeadProcess(aead, nonce, nil, true, r, tmpW) - if err != nil { - rerr = err - return - } - fileSize = int64(written) - if err = tmpW.Flush(); err != nil { - rerr = err - return - } - if _, err = src.Seek(0, io.SeekStart); err != nil { - rerr = err - return - } - r, w := io.Pipe() - go func() { - for i := 0; i < aead.NonceSize(); i++ { - nonce[i] = 0 - } - if _, err := aeadProcess(aead, nonce, nil, false, bufio.NewReader(src), w); err != nil { - w.CloseWithError(err) - } - }() - reader = r - closer = src - return -} - func prepareTxFile(srcPath string) ( reader io.Reader, closer io.Closer, - fileSize int64, + srcSize int64, archived bool, rerr error, ) { if srcPath == "-" { - reader, closer, fileSize, rerr = throughTmpFile(bufio.NewReader(os.Stdin)) + reader = os.Stdin + closer = os.Stdin return } @@ -354,9 +295,9 @@ func prepareTxFile(srcPath string) ( rerr = err return } - fileSize = srcStat.Size() - reader = bufio.NewReader(src) + reader = src closer = src + srcSize = srcStat.Size() return } @@ -386,13 +327,13 @@ func prepareTxFile(srcPath string) ( } if info.IsDir() { // directory header, PAX record header+contents - fileSize += TarBlockSize + 2*TarBlockSize + srcSize += TarBlockSize + 2*TarBlockSize dirs = append(dirs, einfo{path: path, modTime: info.ModTime()}) } else { // file header, PAX record header+contents, file content - fileSize += TarBlockSize + 2*TarBlockSize + info.Size() + srcSize += TarBlockSize + 2*TarBlockSize + info.Size() if n := info.Size() % TarBlockSize; n != 0 { - fileSize += TarBlockSize - n // padding + srcSize += TarBlockSize - n // padding } files = append(files, einfo{ path: path, @@ -409,7 +350,7 @@ func prepareTxFile(srcPath string) ( r, w := io.Pipe() reader = r closer = DummyCloser{} - fileSize += 2 * TarBlockSize // termination block + srcSize += 2 * TarBlockSize // termination block go func() error { tarWr := tar.NewWriter(w) @@ -460,8 +401,7 @@ func (ctx *Ctx) TxFile( node *Node, nice uint8, srcPath, dstPath string, - chunkSize int64, - minSize, maxSize int64, + chunkSize, minSize, maxSize int64, areaId *AreaId, ) error { dstPathSpecified := false @@ -477,39 +417,40 @@ func (ctx *Ctx) TxFile( if filepath.IsAbs(dstPath) { return errors.New("Relative destination path required") } - reader, closer, fileSize, archived, err := prepareTxFile(srcPath) + reader, closer, srcSize, archived, err := prepareTxFile(srcPath) if closer != nil { defer closer.Close() } if err != nil { return err } - if fileSize > maxSize { - return errors.New("Too big than allowed") - } if archived && !dstPathSpecified { dstPath += TarExt } - if fileSize <= chunkSize { + if chunkSize == 0 || (srcSize > 0 && srcSize <= chunkSize) { pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath)) if err != nil { return err } - _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath, areaId) + _, finalSize, err := ctx.Tx( + node, pkt, nice, + srcSize, minSize, maxSize, + bufio.NewReader(reader), dstPath, areaId, + ) les := LEs{ {"Type", "file"}, {"Node", node.Id}, {"Nice", int(nice)}, {"Src", srcPath}, {"Dst", dstPath}, - {"Size", fileSize}, + {"Size", finalSize}, } logMsg := func(les LEs) string { return fmt.Sprintf( "File %s (%s) sent to %s:%s", srcPath, - humanize.IBytes(uint64(fileSize)), + humanize.IBytes(uint64(finalSize)), ctx.NodeName(node.Id), dstPath, ) @@ -522,57 +463,38 @@ func (ctx *Ctx) TxFile( return err } - leftSize := fileSize - metaPkt := ChunkedMeta{ - Magic: MagicNNCPMv2.B, - FileSize: uint64(fileSize), - ChunkSize: uint64(chunkSize), - Checksums: make([][MTHSize]byte, 0, (fileSize/chunkSize)+1), - } - for i := int64(0); i < (fileSize/chunkSize)+1; i++ { - hsh := new([MTHSize]byte) - metaPkt.Checksums = append(metaPkt.Checksums, *hsh) - } - var sizeToSend int64 - var hsh hash.Hash - var pkt *Pkt + br := bufio.NewReader(reader) + var sizeFull int64 var chunkNum int - var path string + checksums := [][MTHSize]byte{} for { - if leftSize <= chunkSize { - sizeToSend = leftSize - } else { - sizeToSend = chunkSize - } - path = dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum) - pkt, err = NewPkt(PktTypeFile, nice, []byte(path)) + lr := io.LimitReader(br, chunkSize) + path := dstPath + ChunkedSuffixPart + strconv.Itoa(chunkNum) + pkt, err := NewPkt(PktTypeFile, nice, []byte(path)) if err != nil { return err } - hsh = MTHNew(0, 0) - _, err = ctx.Tx( - node, - pkt, - nice, - sizeToSend, - minSize, - io.TeeReader(reader, hsh), - path, - areaId, + hsh := MTHNew(0, 0) + _, size, err := ctx.Tx( + node, pkt, nice, + 0, minSize, maxSize, + io.TeeReader(lr, hsh), + path, areaId, ) + les := LEs{ {"Type", "file"}, {"Node", node.Id}, {"Nice", int(nice)}, {"Src", srcPath}, {"Dst", path}, - {"Size", sizeToSend}, + {"Size", size}, } logMsg := func(les LEs) string { return fmt.Sprintf( "File %s (%s) sent to %s:%s", srcPath, - humanize.IBytes(uint64(sizeToSend)), + humanize.IBytes(uint64(size)), ctx.NodeName(node.Id), path, ) @@ -583,25 +505,44 @@ func (ctx *Ctx) TxFile( ctx.LogE("tx", les, err, logMsg) return err } - hsh.Sum(metaPkt.Checksums[chunkNum][:0]) - leftSize -= sizeToSend + + sizeFull += size - PktOverhead + var checksum [MTHSize]byte + hsh.Sum(checksum[:0]) + checksums = append(checksums, checksum) chunkNum++ - if leftSize == 0 { + if size < chunkSize { + break + } + if _, err = br.Peek(1); err != nil { break } } - var metaBuf bytes.Buffer - _, err = xdr.Marshal(&metaBuf, metaPkt) + + metaPkt := ChunkedMeta{ + Magic: MagicNNCPMv2.B, + FileSize: uint64(sizeFull), + ChunkSize: uint64(chunkSize), + Checksums: checksums, + } + var buf bytes.Buffer + _, err = xdr.Marshal(&buf, metaPkt) if err != nil { return err } - path = dstPath + ChunkedSuffixMeta - pkt, err = NewPkt(PktTypeFile, nice, []byte(path)) + path := dstPath + ChunkedSuffixMeta + pkt, err := NewPkt(PktTypeFile, nice, []byte(path)) if err != nil { return err } - metaPktSize := int64(metaBuf.Len()) - _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path, areaId) + metaPktSize := int64(buf.Len()) + _, _, err = ctx.Tx( + node, + pkt, + nice, + metaPktSize, minSize, maxSize, + &buf, path, areaId, + ) les := LEs{ {"Type", "file"}, {"Node", node.Id}, @@ -631,7 +572,8 @@ func (ctx *Ctx) TxFreq( node *Node, nice, replyNice uint8, srcPath, dstPath string, - minSize int64) error { + minSize int64, +) error { dstPath = filepath.Clean(dstPath) if filepath.IsAbs(dstPath) { return errors.New("Relative destination path required") @@ -646,7 +588,7 @@ func (ctx *Ctx) TxFreq( } src := strings.NewReader(dstPath) size := int64(src.Len()) - _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath, nil) + _, _, err = ctx.Tx(node, pkt, nice, size, minSize, MaxFileSize, src, srcPath, nil) les := LEs{ {"Type", "freq"}, {"Node", node.Id}, @@ -676,8 +618,7 @@ func (ctx *Ctx) TxExec( handle string, args []string, in io.Reader, - minSize int64, - useTmp bool, + minSize int64, maxSize int64, noCompress bool, areaId *AreaId, ) error { @@ -690,82 +631,34 @@ func (ctx *Ctx) TxExec( if noCompress { pktType = PktTypeExecFat } - pkt, rerr := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0})) - if rerr != nil { - return rerr - } - var size int64 - - if !noCompress && !useTmp { - var compressed bytes.Buffer - compressor, err := zstd.NewWriter( - &compressed, - zstd.WithEncoderLevel(zstd.SpeedDefault), - ) - if err != nil { - return err - } - if _, err = io.Copy(compressor, in); err != nil { - compressor.Close() - return err - } - if err = compressor.Close(); err != nil { - return err - } - size = int64(compressed.Len()) - _, rerr = ctx.Tx(node, pkt, nice, size, minSize, &compressed, handle, areaId) - } - if noCompress && !useTmp { - var data bytes.Buffer - if _, err := io.Copy(&data, in); err != nil { - return err - } - size = int64(data.Len()) - _, rerr = ctx.Tx(node, pkt, nice, size, minSize, &data, handle, areaId) + pkt, err := NewPkt(pktType, replyNice, bytes.Join(path, []byte{0})) + if err != nil { + return err } - if !noCompress && useTmp { - r, w := io.Pipe() - compressor, err := zstd.NewWriter(w, zstd.WithEncoderLevel(zstd.SpeedDefault)) + compressErr := make(chan error, 1) + if !noCompress { + pr, pw := io.Pipe() + compressor, err := zstd.NewWriter(pw, zstd.WithEncoderLevel(zstd.SpeedDefault)) if err != nil { return err } - copyErr := make(chan error) - go func() { - _, err := io.Copy(compressor, in) - if err != nil { - compressor.Close() - copyErr <- err + go func(r io.Reader) { + if _, err := io.Copy(compressor, r); err != nil { + compressErr <- err + return } - err = compressor.Close() - w.Close() - copyErr <- err - }() - tmpReader, closer, fileSize, err := throughTmpFile(r) - if closer != nil { - defer closer.Close() - } - if err != nil { - return err - } - err = <-copyErr - if err != nil { - return err - } - size = fileSize - _, rerr = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle, areaId) - } - if noCompress && useTmp { - tmpReader, closer, fileSize, err := throughTmpFile(in) - if closer != nil { - defer closer.Close() - } - if err != nil { - return err + compressErr <- compressor.Close() + pw.Close() + }(in) + in = pr + } + _, size, err := ctx.Tx(node, pkt, nice, 0, minSize, maxSize, in, handle, areaId) + if !noCompress { + e := <-compressErr + if err == nil { + err = e } - size = fileSize - _, rerr = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle, areaId) } - dst := strings.Join(append([]string{handle}, args...), " ") les := LEs{ {"Type", "exec"}, @@ -781,12 +674,12 @@ func (ctx *Ctx) TxExec( ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)), ) } - if rerr == nil { + if err == nil { ctx.LogI("tx", les, logMsg) } else { - ctx.LogE("tx", les, rerr, logMsg) + ctx.LogE("tx", les, err, logMsg) } - return rerr + return err } func (ctx *Ctx) TxTrns(node *Node, nice uint8, size int64, src io.Reader) error {