X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Ftx.go;h=a90a49e42d05f0f96e3639e3740099f15a40c29c;hb=ab7c7eca0e53661f0ba904c2a6ba752990bea367;hp=15cf08a060ccb20cefcaeabc5f88e29f40580a72;hpb=0e42807313d26a8ee854d666920d6866a6e48a79;p=nncp.git diff --git a/src/tx.go b/src/tx.go index 15cf08a..a90a49e 100644 --- a/src/tx.go +++ b/src/tx.go @@ -36,6 +36,7 @@ import ( xdr "github.com/davecgh/go-xdr/xdr2" "github.com/dustin/go-humanize" "github.com/klauspost/compress/zstd" + "golang.org/x/crypto/blake2b" "golang.org/x/crypto/chacha20poly1305" ) @@ -53,7 +54,15 @@ func (ctx *Ctx) Tx( size, minSize int64, src io.Reader, pktName string, + areaId *AreaId, ) (*Node, error) { + var area *Area + if areaId != nil { + area = ctx.AreaId2Area[*areaId] + if area.Prv == nil { + return nil, errors.New("unknown area id") + } + } hops := make([]*Node, 0, 1+len(node.Via)) hops = append(hops, node) lastNode := node @@ -62,7 +71,11 @@ func (ctx *Ctx) Tx( hops = append(hops, lastNode) } expectedSize := size - for i := 0; i < len(hops); i++ { + wrappers := len(hops) + if area != nil { + wrappers++ + } + for i := 0; i < wrappers; i++ { expectedSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+expectedSize) @@ -83,32 +96,84 @@ func (ctx *Ctx) Tx( pktEncRaws := make(chan []byte) curSize := size pipeR, pipeW := io.Pipe() - go func(size int64, src io.Reader, dst io.WriteCloser) { - ctx.LogD("tx", LEs{ - {"Node", hops[0].Id}, - {"Nice", int(nice)}, - {"Size", size}, - }, func(les LEs) string { - return fmt.Sprintf( - "Tx packet to %s (%s) nice: %s", - ctx.NodeName(hops[0].Id), - humanize.IBytes(uint64(size)), - NicenessFmt(nice), - ) - }) - pktEncRaw, err := PktEncWrite( - ctx.Self, hops[0], pkt, nice, size, padSize, src, dst, - ) - pktEncRaws <- pktEncRaw - errs <- err - dst.Close() // #nosec G104 - }(curSize, src, pipeW) - curSize = PktEncOverhead + - PktSizeOverhead + - sizeWithTags(PktOverhead+curSize) + - padSize - var pipeRPrev io.Reader + if area == nil { + go func(size int64, src io.Reader, dst io.WriteCloser) { + ctx.LogD("tx", LEs{ + {"Node", hops[0].Id}, + {"Nice", int(nice)}, + {"Size", size}, + }, func(les LEs) string { + return fmt.Sprintf( + "Tx packet to %s (%s) nice: %s", + ctx.NodeName(hops[0].Id), + humanize.IBytes(uint64(size)), + NicenessFmt(nice), + ) + }) + pktEncRaw, err := PktEncWrite( + ctx.Self, hops[0], pkt, nice, size, padSize, src, dst, + ) + pktEncRaws <- pktEncRaw + errs <- err + dst.Close() // #nosec G104 + }(curSize, src, pipeW) + curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + curSize += padSize + } else { + go func(size, padSize int64, src io.Reader, dst io.WriteCloser) { + ctx.LogD("tx", LEs{ + {"Area", area.Id}, + {"Nice", int(nice)}, + {"Size", size}, + }, func(les LEs) string { + return fmt.Sprintf( + "Tx area packet to %s (%s) nice: %s", + ctx.AreaName(areaId), + humanize.IBytes(uint64(size)), + NicenessFmt(nice), + ) + }) + areaNode := Node{Id: new(NodeId), ExchPub: new([32]byte)} + copy(areaNode.Id[:], area.Id[:]) + copy(areaNode.ExchPub[:], area.Pub[:]) + pktEncRaw, err := PktEncWrite( + ctx.Self, &areaNode, pkt, nice, size, padSize, src, dst, + ) + pktEncRaws <- pktEncRaw + errs <- err + dst.Close() // #nosec G104 + }(curSize, padSize, src, pipeW) + curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + curSize += padSize + pipeRPrev = pipeR + pipeR, pipeW = io.Pipe() + go func(size int64, src io.Reader, dst io.WriteCloser) { + pktArea, err := NewPkt(PktTypeArea, 0, area.Id[:]) + if err != nil { + panic(err) + } + ctx.LogD("tx", LEs{ + {"Node", hops[0].Id}, + {"Nice", int(nice)}, + {"Size", size}, + }, func(les LEs) string { + return fmt.Sprintf( + "Tx packet to %s (%s) nice: %s", + ctx.NodeName(hops[0].Id), + humanize.IBytes(uint64(size)), + NicenessFmt(nice), + ) + }) + pktEncRaw, err := PktEncWrite( + ctx.Self, hops[0], pktArea, nice, size, 0, src, dst, + ) + pktEncRaws <- pktEncRaw + errs <- err + dst.Close() // #nosec G104 + }(curSize, pipeRPrev, pipeW) + curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + } for i := 1; i < len(hops); i++ { pktTrns, err := NewPkt(PktTypeTrns, 0, hops[i-1].Id[:]) if err != nil { @@ -145,10 +210,14 @@ func (ctx *Ctx) Tx( errs <- err }() var pktEncRaw []byte + var pktEncMsg []byte + if area != nil { + pktEncMsg = <-pktEncRaws + } for i := 0; i < len(hops); i++ { pktEncRaw = <-pktEncRaws } - for i := 0; i <= len(hops); i++ { + for i := 0; i <= wrappers; i++ { err = <-errs if err != nil { tmp.Fd.Close() // #nosec G104 @@ -164,6 +233,43 @@ func (ctx *Ctx) Tx( if ctx.HdrUsage { ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum())) } + if area != nil { + msgHashRaw := blake2b.Sum256(pktEncMsg) + msgHash := Base32Codec.EncodeToString(msgHashRaw[:]) + seenDir := filepath.Join( + ctx.Spool, ctx.SelfId.String(), AreaDir, areaId.String(), + ) + seenPath := filepath.Join(seenDir, msgHash+SeenSuffix) + les := LEs{ + {"Node", node.Id}, + {"Nice", int(nice)}, + {"Size", size}, + {"Area", areaId}, + {"AreaMsg", msgHash}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "Tx area packet to %s (%s) nice: %s, area %s: %s", + ctx.NodeName(node.Id), + humanize.IBytes(uint64(size)), + NicenessFmt(nice), + area.Name, + msgHash, + ) + } + if err = os.MkdirAll(seenDir, os.FileMode(0777)); err != nil { + ctx.LogE("tx-mkdir", les, err, logMsg) + return lastNode, err + } + if fd, err := os.Create(seenPath); err == nil { + fd.Close() + if err = DirSync(seenDir); err != nil { + ctx.LogE("tx-dirsync", les, err, logMsg) + return lastNode, err + } + } + ctx.LogI("tx-area", les, logMsg) + } return lastNode, err } @@ -353,6 +459,7 @@ func (ctx *Ctx) TxFile( srcPath, dstPath string, chunkSize int64, minSize, maxSize int64, + areaId *AreaId, ) error { dstPathSpecified := false if dstPath == "" { @@ -386,7 +493,7 @@ func (ctx *Ctx) TxFile( if err != nil { return err } - _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath) + _, err = ctx.Tx(node, pkt, nice, fileSize, minSize, reader, dstPath, areaId) les := LEs{ {"Type", "file"}, {"Node", node.Id}, @@ -448,6 +555,7 @@ func (ctx *Ctx) TxFile( minSize, io.TeeReader(reader, hsh), path, + areaId, ) les := LEs{ {"Type", "file"}, @@ -490,7 +598,7 @@ func (ctx *Ctx) TxFile( return err } metaPktSize := int64(metaBuf.Len()) - _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path) + _, err = ctx.Tx(node, pkt, nice, metaPktSize, minSize, &metaBuf, path, areaId) les := LEs{ {"Type", "file"}, {"Node", node.Id}, @@ -535,7 +643,7 @@ func (ctx *Ctx) TxFreq( } src := strings.NewReader(dstPath) size := int64(src.Len()) - _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath) + _, err = ctx.Tx(node, pkt, nice, size, minSize, src, srcPath, nil) les := LEs{ {"Type", "freq"}, {"Node", node.Id}, @@ -568,6 +676,7 @@ func (ctx *Ctx) TxExec( minSize int64, useTmp bool, noCompress bool, + areaId *AreaId, ) error { path := make([][]byte, 0, 1+len(args)) path = append(path, []byte(handle)) @@ -601,7 +710,7 @@ func (ctx *Ctx) TxExec( return err } size = int64(compressed.Len()) - _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed, handle) + _, err = ctx.Tx(node, pkt, nice, size, minSize, &compressed, handle, areaId) } if noCompress && !useTmp { var data bytes.Buffer @@ -609,7 +718,7 @@ func (ctx *Ctx) TxExec( return err } size = int64(data.Len()) - _, err = ctx.Tx(node, pkt, nice, size, minSize, &data, handle) + _, err = ctx.Tx(node, pkt, nice, size, minSize, &data, handle, areaId) } if !noCompress && useTmp { r, w := io.Pipe() @@ -640,7 +749,7 @@ func (ctx *Ctx) TxExec( return err } size = fileSize - _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle) + _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle, areaId) } if noCompress && useTmp { tmpReader, closer, fileSize, err := throughTmpFile(in) @@ -651,7 +760,7 @@ func (ctx *Ctx) TxExec( return err } size = fileSize - _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle) + _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle, areaId) } dst := strings.Join(append([]string{handle}, args...), " ")