X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Ftoss.go;h=c0aa7bfed2fb3210c42dcff4d71e8494ffcd3ab3;hb=4e08a1c97600e0372680e86a651f916c70e89342;hp=05b5a3a6a51cde6862f3f5344bf2d480681f6c1c;hpb=271870ad4f56253e0918f673b90615e4749cf201;p=nncp.git diff --git a/src/toss.go b/src/toss.go index 05b5a3a..c0aa7bf 100644 --- a/src/toss.go +++ b/src/toss.go @@ -68,6 +68,12 @@ func (ctx *Ctx) Toss( nice uint8, dryRun, doSeen, noFile, noFreq, noExec, noTrns bool, ) bool { + dirLock, err := ctx.LockDir(nodeId, "toss") + if err != nil { + ctx.LogE("rx", SDS{}, err, "lock") + return false + } + defer ctx.UnlockDir(dirLock) isBad := false sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"] decompressor, err := zstd.NewReader(nil) @@ -83,8 +89,7 @@ func (ctx *Ctx) Toss( continue } pipeR, pipeW := io.Pipe() - errs := make(chan error, 1) - go func(job Job) { + go func(job Job) error { pipeWB := bufio.NewWriter(pipeW) _, _, err := PktEncRead( ctx.Self, @@ -92,13 +97,16 @@ func (ctx *Ctx) Toss( bufio.NewReader(job.Fd), pipeWB, ) - errs <- err - pipeWB.Flush() - pipeW.Close() - job.Fd.Close() + job.Fd.Close() // #nosec G104 if err != nil { ctx.LogE("rx", sds, err, "decryption") + return pipeW.CloseWithError(err) + } + if err = pipeWB.Flush(); err != nil { + ctx.LogE("rx", sds, err, "decryption flush") + return pipeW.CloseWithError(err) } + return pipeW.Close() }(job) var pkt Pkt var err error @@ -174,7 +182,9 @@ func (ctx *Ctx) Toss( cmd.Stdin = newNotification(notify, fmt.Sprintf( "Exec from %s: %s", sender.Name, argsStr, ), output) - cmd.Run() + if err = cmd.Run(); err != nil { + ctx.LogE("rx", sds, err, "notify") + } } } } @@ -182,7 +192,7 @@ func (ctx *Ctx) Toss( if !dryRun { if doSeen { if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil { - fd.Close() + fd.Close() // #nosec G104 } } if err = os.Remove(job.Fd.Name()); err != nil { @@ -233,18 +243,22 @@ func (ctx *Ctx) Toss( goto Closing } if err = bufW.Flush(); err != nil { - tmp.Close() + tmp.Close() // #nosec G104 ctx.LogE("rx", sds, err, "copy") isBad = true goto Closing } if err = tmp.Sync(); err != nil { - tmp.Close() + tmp.Close() // #nosec G104 + ctx.LogE("rx", sds, err, "copy") + isBad = true + goto Closing + } + if err = tmp.Close(); err != nil { ctx.LogE("rx", sds, err, "copy") isBad = true goto Closing } - tmp.Close() dstPathOrig := filepath.Join(*incoming, dst) dstPath := dstPathOrig dstPathCtr := 0 @@ -274,7 +288,7 @@ func (ctx *Ctx) Toss( if !dryRun { if doSeen { if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil { - fd.Close() + fd.Close() // #nosec G104 } } if err = os.Remove(job.Fd.Name()); err != nil { @@ -292,7 +306,9 @@ func (ctx *Ctx) Toss( dst, humanize.IBytes(uint64(pktSize)), ), nil) - cmd.Run() + if err = cmd.Run(); err != nil { + ctx.LogE("rx", sds, err, "notify") + } } } case PktTypeFreq: @@ -341,7 +357,7 @@ func (ctx *Ctx) Toss( if !dryRun { if doSeen { if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil { - fd.Close() + fd.Close() // #nosec G104 } } if err = os.Remove(job.Fd.Name()); err != nil { @@ -356,7 +372,9 @@ func (ctx *Ctx) Toss( cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf( "Freq from %s: %s", sender.Name, src, ), nil) - cmd.Run() + if err = cmd.Run(); err != nil { + ctx.LogE("rx", sds, err, "notify") + } } } case PktTypeTrns: @@ -385,7 +403,7 @@ func (ctx *Ctx) Toss( if !dryRun { if doSeen { if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil { - fd.Close() + fd.Close() // #nosec G104 } } if err = os.Remove(job.Fd.Name()); err != nil { @@ -398,7 +416,7 @@ func (ctx *Ctx) Toss( isBad = true } Closing: - pipeR.Close() + pipeR.Close() // #nosec G104 } return isBad }