X-Git-Url: http://www.git.cypherpunks.ru/?p=nncp.git;a=blobdiff_plain;f=src%2Ftx.go;h=e38084e1a7a3c01390987af3da42c8667d2ee4ab;hp=6e13f2ef3fbeb9e4f97fe61b99d1897f6a8f6c09;hb=9eee562a50358d2f52449e80c8c47116d34f9415;hpb=ba16c17a029e3a1b2c836915b2473edcee259277 diff --git a/src/tx.go b/src/tx.go index 6e13f2e..e38084e 100644 --- a/src/tx.go +++ b/src/tx.go @@ -57,12 +57,12 @@ func (ctx *Ctx) Tx( src io.Reader, pktName string, areaId *AreaId, -) (*Node, int64, error) { +) (*Node, int64, string, error) { var area *Area if areaId != nil { area = ctx.AreaId2Area[*areaId] if area.Prv == nil { - return nil, 0, errors.New("area has no encryption keys") + return nil, 0, "", errors.New("area has no encryption keys") } } hops := make([]*Node, 0, 1+len(node.Via)) @@ -82,15 +82,15 @@ func (ctx *Ctx) Tx( expectedSize += sizePadCalc(expectedSize, minSize, wrappers) expectedSize = PktEncOverhead + sizeWithTags(expectedSize) if maxSize != 0 && expectedSize > maxSize { - return nil, 0, TooBig + return nil, 0, "", TooBig } if !ctx.IsEnoughSpace(expectedSize) { - return nil, 0, errors.New("is not enough space") + return nil, 0, "", errors.New("is not enough space") } } tmp, err := ctx.NewTmpFileWHash() if err != nil { - return nil, 0, err + return nil, 0, "", err } results := make(chan PktEncWriteResult) @@ -211,7 +211,7 @@ func (ctx *Ctx) Tx( r := <-results if r.err != nil { tmp.Fd.Close() - return nil, 0, r.err + return nil, 0, "", r.err } if r.pktEncRaw != nil { pktEncRaw = r.pktEncRaw @@ -224,7 +224,7 @@ func (ctx *Ctx) Tx( err = tmp.Commit(filepath.Join(nodePath, string(TTx))) os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) if err != nil { - return lastNode, 0, err + return lastNode, 0, "", err } if ctx.HdrUsage { ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum())) @@ -255,18 +255,18 @@ func (ctx *Ctx) Tx( } if err = ensureDir(seenDir); err != nil { ctx.LogE("tx-mkdir", les, err, logMsg) - return lastNode, 0, err + return lastNode, 0, "", err } if fd, err := os.Create(seenPath); err == nil { fd.Close() if err = DirSync(seenDir); err != nil { ctx.LogE("tx-dirsync", les, err, logMsg) - return lastNode, 0, err + return lastNode, 0, "", err } } ctx.LogI("tx-area", les, logMsg) } - return lastNode, payloadSize, err + return lastNode, payloadSize, tmp.Checksum(), err } type DummyCloser struct{} @@ -438,7 +438,7 @@ func (ctx *Ctx) TxFile( if err != nil { return err } - _, finalSize, err := ctx.Tx( + _, finalSize, pktName, err := ctx.Tx( node, pkt, nice, srcSize, minSize, maxSize, bufio.NewReader(reader), dstPath, areaId, @@ -450,6 +450,7 @@ func (ctx *Ctx) TxFile( {"Src", srcPath}, {"Dst", dstPath}, {"Size", finalSize}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( @@ -480,7 +481,7 @@ func (ctx *Ctx) TxFile( return err } hsh := MTHNew(0, 0) - _, size, err := ctx.Tx( + _, size, pktName, err := ctx.Tx( node, pkt, nice, 0, minSize, maxSize, io.TeeReader(lr, hsh), @@ -494,6 +495,7 @@ func (ctx *Ctx) TxFile( {"Src", srcPath}, {"Dst", path}, {"Size", size}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( @@ -541,7 +543,7 @@ func (ctx *Ctx) TxFile( return err } metaPktSize := int64(buf.Len()) - _, _, err = ctx.Tx( + _, _, pktName, err := ctx.Tx( node, pkt, nice, @@ -555,6 +557,7 @@ func (ctx *Ctx) TxFile( {"Src", srcPath}, {"Dst", path}, {"Size", metaPktSize}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( @@ -593,7 +596,9 @@ func (ctx *Ctx) TxFreq( } src := strings.NewReader(dstPath) size := int64(src.Len()) - _, _, err = ctx.Tx(node, pkt, nice, size, minSize, MaxFileSize, src, srcPath, nil) + _, _, pktName, err := ctx.Tx( + node, pkt, nice, size, minSize, MaxFileSize, src, srcPath, nil, + ) les := LEs{ {"Type", "freq"}, {"Node", node.Id}, @@ -601,6 +606,7 @@ func (ctx *Ctx) TxFreq( {"ReplyNice", int(replyNice)}, {"Src", srcPath}, {"Dst", dstPath}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( @@ -657,7 +663,9 @@ func (ctx *Ctx) TxExec( }(in) in = pr } - _, size, err := ctx.Tx(node, pkt, nice, 0, minSize, maxSize, in, handle, areaId) + _, size, pktName, err := ctx.Tx( + node, pkt, nice, 0, minSize, maxSize, in, handle, areaId, + ) if !noCompress { e := <-compressErr if err == nil { @@ -672,6 +680,7 @@ func (ctx *Ctx) TxExec( {"ReplyNice", int(replyNice)}, {"Dst", dst}, {"Size", size}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( @@ -735,25 +744,28 @@ func (ctx *Ctx) TxACK( nice uint8, hsh string, minSize int64, -) error { +) (pktName string, err error) { hshRaw, err := Base32Codec.DecodeString(hsh) if err != nil { - return err + return "", err } if len(hshRaw) != MTHSize { - return errors.New("Invalid packet id size") + return "", errors.New("Invalid packet id size") } pkt, err := NewPkt(PktTypeACK, nice, []byte(hshRaw)) if err != nil { - return err + return "", err } src := bytes.NewReader([]byte{}) - _, _, err = ctx.Tx(node, pkt, nice, 0, minSize, MaxFileSize, src, hsh, nil) + _, _, pktName, err = ctx.Tx( + node, pkt, nice, 0, minSize, MaxFileSize, src, hsh, nil, + ) les := LEs{ {"Type", "ack"}, {"Node", node.Id}, {"Nice", int(nice)}, {"Pkt", hsh}, + {"NewPkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf("ACK to %s of %s is sent", ctx.NodeName(node.Id), hsh) @@ -763,5 +775,5 @@ func (ctx *Ctx) TxACK( } else { ctx.LogE("tx", les, err, logMsg) } - return err + return }