X-Git-Url: http://www.git.cypherpunks.ru/?p=nncp.git;a=blobdiff_plain;f=src%2Ftoss.go;h=4cc4392a6f2404478b5893ecf34193d6b0149e5d;hp=f23b5836fe6374f53aefebdfdf6f95c5684519f4;hb=8f99a73cf379f046b3bfb0f4a1a715bb5e215106;hpb=ab7c7eca0e53661f0ba904c2a6ba752990bea367 diff --git a/src/toss.go b/src/toss.go index f23b583..4cc4392 100644 --- a/src/toss.go +++ b/src/toss.go @@ -1,6 +1,6 @@ /* NNCP -- Node to Node copy, utilities for store-and-forward data exchange -Copyright (C) 2016-2021 Sergey Matveev +Copyright (C) 2016-2022 Sergey Matveev This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -43,9 +43,13 @@ import ( ) const ( - SeenSuffix = ".seen" + SeenDir = "seen" ) +func jobPath2Seen(jobPath string) string { + return filepath.Join(filepath.Dir(jobPath), SeenDir, filepath.Base(jobPath)) +} + func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader { lines := []string{ "From: " + fromTo.From, @@ -87,7 +91,7 @@ func jobProcess( pktSize uint64, jobPath string, decompressor *zstd.Decoder, - dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool, + dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool, ) error { defer pipeR.Close() sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"] @@ -120,8 +124,8 @@ func jobProcess( } argsStr := strings.Join(append([]string{handle}, args...), " ") les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr}) - cmdline, exists := sender.Exec[handle] - if !exists || len(cmdline) == 0 { + cmdline := sender.Exec[handle] + if len(cmdline) == 0 { err = errors.New("No handle found") ctx.LogE( "rx-no-handle", les, err, @@ -153,9 +157,12 @@ func jobProcess( } else { cmd.Stdin = pipeR } - output, err := cmd.Output() + output, err := cmd.CombinedOutput() if err != nil { - ctx.LogE("rx-hande", les, err, func(les LEs) string { + les = append(les, LE{"Output", strings.Split( + strings.Trim(string(output), "\n"), "\n"), + }) + ctx.LogE("rx-handle", les, err, func(les LEs) string { return fmt.Sprintf( "Tossing exec %s/%s (%s): %s: handling", sender.Name, pktName, @@ -165,11 +172,11 @@ func jobProcess( return err } if len(sendmail) > 0 && ctx.NotifyExec != nil { - notify, exists := ctx.NotifyExec[sender.Name+"."+handle] - if !exists { - notify, exists = ctx.NotifyExec["*."+handle] + notify := ctx.NotifyExec[sender.Name+"."+handle] + if notify == nil { + notify = ctx.NotifyExec["*."+handle] } - if exists { + if notify != nil { cmd := exec.Command( sendmail[0], append(sendmail[1:], notify.To)..., @@ -198,8 +205,22 @@ func jobProcess( }) if !dryRun && jobPath != "" { if doSeen { - if fd, err := os.Create(jobPath + SeenSuffix); err == nil { - fd.Close() // #nosec G104 + if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil { + return err + } + if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil { + fd.Close() + if err = DirSync(filepath.Dir(jobPath)); err != nil { + ctx.LogE("rx-dirsync", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: dirsyncing", + sender.Name, pktName, + humanize.IBytes(pktSize), + filepath.Base(jobPath), + ) + }) + return err + } } } if err = os.Remove(jobPath); err != nil { @@ -212,7 +233,7 @@ func jobProcess( }) return err } else if ctx.HdrUsage { - os.Remove(jobPath + HdrSuffix) + os.Remove(JobPath2Hdr(jobPath)) } } @@ -298,7 +319,7 @@ func jobProcess( return err } if err = bufW.Flush(); err != nil { - tmp.Close() // #nosec G104 + tmp.Close() ctx.LogE("rx-flush", les, err, func(les LEs) string { return fmt.Sprintf( "Tossing file %s/%s (%s): %s: flushing", @@ -308,16 +329,18 @@ func jobProcess( }) return err } - if err = tmp.Sync(); err != nil { - tmp.Close() // #nosec G104 - ctx.LogE("rx-sync", les, err, func(les LEs) string { - return fmt.Sprintf( - "Tossing file %s/%s (%s): %s: syncing", - sender.Name, pktName, - humanize.IBytes(pktSize), dst, - ) - }) - return err + if !NoSync { + if err = tmp.Sync(); err != nil { + tmp.Close() + ctx.LogE("rx-sync", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: syncing", + sender.Name, pktName, + humanize.IBytes(pktSize), dst, + ) + }) + return err + } } if err = tmp.Close(); err != nil { ctx.LogE("rx-close", les, err, func(les LEs) string { @@ -380,8 +403,22 @@ func jobProcess( if !dryRun { if jobPath != "" { if doSeen { - if fd, err := os.Create(jobPath + SeenSuffix); err == nil { - fd.Close() // #nosec G104 + if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil { + return err + } + if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil { + fd.Close() + if err = DirSync(filepath.Dir(jobPath)); err != nil { + ctx.LogE("rx-dirsync", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: dirsyncing", + sender.Name, pktName, + humanize.IBytes(pktSize), + filepath.Base(jobPath), + ) + }) + return err + } } } if err = os.Remove(jobPath); err != nil { @@ -394,7 +431,7 @@ func jobProcess( }) return err } else if ctx.HdrUsage { - os.Remove(jobPath + HdrSuffix) + os.Remove(JobPath2Hdr(jobPath)) } } if len(sendmail) > 0 && ctx.NotifyFile != nil { @@ -494,8 +531,22 @@ func jobProcess( if !dryRun { if jobPath != "" { if doSeen { - if fd, err := os.Create(jobPath + SeenSuffix); err == nil { - fd.Close() // #nosec G104 + if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil { + return err + } + if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil { + fd.Close() + if err = DirSync(filepath.Dir(jobPath)); err != nil { + ctx.LogE("rx-dirsync", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: dirsyncing", + sender.Name, pktName, + humanize.IBytes(pktSize), + filepath.Base(jobPath), + ) + }) + return err + } } } if err = os.Remove(jobPath); err != nil { @@ -508,7 +559,7 @@ func jobProcess( }) return err } else if ctx.HdrUsage { - os.Remove(jobPath + HdrSuffix) + os.Remove(JobPath2Hdr(jobPath)) } } if len(sendmail) > 0 && ctx.NotifyFreq != nil { @@ -555,11 +606,35 @@ func jobProcess( } ctx.LogD("rx-tx", les, logMsg) if !dryRun { - if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil { - ctx.LogE("rx", les, err, func(les LEs) string { - return logMsg(les) + ": txing" - }) - return err + if len(node.Via) == 0 { + if err = ctx.TxTrns(node, nice, int64(pktSize), pipeR); err != nil { + ctx.LogE("rx", les, err, func(les LEs) string { + return logMsg(les) + ": txing" + }) + return err + } + } else { + via := node.Via[:len(node.Via)-1] + node = ctx.Neigh[*node.Via[len(node.Via)-1]] + node = &Node{Id: node.Id, Via: via, ExchPub: node.ExchPub} + pktTrns, err := NewPkt(PktTypeTrns, 0, nodeId[:]) + if err != nil { + panic(err) + } + if _, _, err = ctx.Tx( + node, + pktTrns, + nice, + int64(pktSize), 0, MaxFileSize, + pipeR, + pktName, + nil, + ); err != nil { + ctx.LogE("rx", les, err, func(les LEs) string { + return logMsg(les) + ": txing" + }) + return err + } } } ctx.LogI("rx", les, func(les LEs) string { @@ -572,8 +647,22 @@ func jobProcess( }) if !dryRun && jobPath != "" { if doSeen { - if fd, err := os.Create(jobPath + SeenSuffix); err == nil { - fd.Close() // #nosec G104 + if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil { + return err + } + if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil { + fd.Close() + if err = DirSync(filepath.Dir(jobPath)); err != nil { + ctx.LogE("rx-dirsync", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: dirsyncing", + sender.Name, pktName, + humanize.IBytes(pktSize), + filepath.Base(jobPath), + ) + }) + return err + } } } if err = os.Remove(jobPath); err != nil { @@ -587,7 +676,7 @@ func jobProcess( }) return err } else if ctx.HdrUsage { - os.Remove(jobPath + HdrSuffix) + os.Remove(JobPath2Hdr(jobPath)) } } @@ -630,7 +719,7 @@ func jobProcess( seenDir := filepath.Join( ctx.Spool, nodeId.String(), AreaDir, area.Id.String(), ) - seenPath := filepath.Join(seenDir, msgHash+SeenSuffix) + seenPath := filepath.Join(seenDir, msgHash) logMsgNode := func(les LEs) string { return fmt.Sprintf( "%s: echoing to: %s", logMsg(les), node.Name, @@ -651,7 +740,7 @@ func jobProcess( seenDir := filepath.Join( ctx.Spool, nodeId.String(), AreaDir, area.Id.String(), ) - seenPath := filepath.Join(seenDir, msgHash+SeenSuffix) + seenPath := filepath.Join(seenDir, msgHash) logMsgNode := func(les LEs) string { return fmt.Sprintf("%s: echo to: %s", logMsg(les), node.Name) } @@ -661,10 +750,16 @@ func jobProcess( }) continue } - if nodeId != sender.Id { + if nodeId != sender.Id && nodeId != pktEnc.Sender { ctx.LogI("rx-area-echo", lesEcho, logMsgNode) - if _, err = ctx.Tx( - node, &pkt, nice, int64(pktSize), 0, fullPipeR, pktName, nil, + if _, _, err = ctx.Tx( + node, + &pkt, + nice, + int64(pktSize), 0, MaxFileSize, + fullPipeR, + pktName, + nil, ); err != nil { ctx.LogE("rx-area", lesEcho, err, logMsgNode) return err @@ -691,7 +786,7 @@ func jobProcess( seenDir := filepath.Join( ctx.Spool, ctx.SelfId.String(), AreaDir, area.Id.String(), ) - seenPath := filepath.Join(seenDir, msgHash+SeenSuffix) + seenPath := filepath.Join(seenDir, msgHash) if _, err := os.Stat(seenPath); err == nil { ctx.LogD("rx-area-seen", les, func(les LEs) string { return logMsg(les) + ": already seen" @@ -708,7 +803,7 @@ func jobProcess( }) return err } else if ctx.HdrUsage { - os.Remove(jobPath + HdrSuffix) + os.Remove(JobPath2Hdr(jobPath)) } } return nil @@ -763,7 +858,7 @@ func jobProcess( uint64(pktSizeWithoutEnc(int64(pktSize))), "", decompressor, - dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, + dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK, ) }() _, _, _, err = PktEncRead( @@ -775,8 +870,9 @@ func jobProcess( nil, ) if err != nil { + ctx.LogE("rx-area-pkt-enc-read2", les, err, logMsg) pipeW.CloseWithError(err) - go func() { <-errs }() + <-errs return err } pipeW.Close() @@ -808,9 +904,69 @@ func jobProcess( }) return err } else if ctx.HdrUsage { - os.Remove(jobPath + HdrSuffix) + os.Remove(JobPath2Hdr(jobPath)) + } + } + + case PktTypeACK: + if noACK { + return nil + } + hsh := Base32Codec.EncodeToString(pkt.Path[:MTHSize]) + les := append(les, LE{"Type", "ack"}, LE{"Pkt", hsh}) + logMsg := func(les LEs) string { + return fmt.Sprintf("Tossing ack %s/%s: %s", sender.Name, pktName, hsh) + } + ctx.LogD("rx-ack", les, logMsg) + pktPath := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), hsh) + if _, err := os.Stat(pktPath); err == nil { + if !dryRun { + if err = os.Remove(pktPath); err != nil { + ctx.LogE("rx-ack", les, err, func(les LEs) string { + return logMsg(les) + ": removing packet" + }) + return err + } else if ctx.HdrUsage { + os.Remove(JobPath2Hdr(pktPath)) + } + } + } else { + ctx.LogD("rx-ack", les, func(les LEs) string { + return logMsg(les) + ": already disappeared" + }) + } + if !dryRun && doSeen { + if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil { + return err + } + if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil { + fd.Close() + if err = DirSync(filepath.Dir(jobPath)); err != nil { + ctx.LogE("rx-dirsync", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: dirsyncing", + sender.Name, pktName, + humanize.IBytes(pktSize), + filepath.Base(jobPath), + ) + }) + return err + } + } + } + if !dryRun { + if err = os.Remove(jobPath); err != nil { + ctx.LogE("rx", les, err, func(les LEs) string { + return logMsg(les) + ": removing job" + }) + return err + } else if ctx.HdrUsage { + os.Remove(JobPath2Hdr(jobPath)) } } + ctx.LogI("rx", les, func(les LEs) string { + return fmt.Sprintf("Got ACK packet from %s of %s", sender.Name, hsh) + }) default: err = errors.New("unknown type") @@ -832,7 +988,7 @@ func (ctx *Ctx) Toss( nodeId *NodeId, xx TRxTx, nice uint8, - dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool, + dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool, ) bool { dirLock, err := ctx.LockDir(nodeId, "toss") if err != nil { @@ -873,6 +1029,18 @@ func (ctx *Ctx) Toss( isBad = true continue } + sender := ctx.Neigh[*job.PktEnc.Sender] + if sender == nil { + err := errors.New("unknown node") + ctx.LogE("rx-open", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing %s/%s", + ctx.NodeName(job.PktEnc.Sender), pktName, + ) + }) + isBad = true + continue + } errs := make(chan error, 1) var sharedKey []byte Retry: @@ -883,12 +1051,12 @@ func (ctx *Ctx) Toss( pipeR, pktName, les, - ctx.Neigh[*job.PktEnc.Sender], + sender, job.PktEnc.Nice, uint64(pktSizeWithoutEnc(job.Size)), job.Path, decompressor, - dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, + dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK, ) }() pipeWB := bufio.NewWriter(pipeW) @@ -911,7 +1079,7 @@ func (ctx *Ctx) Toss( if err != nil { isBad = true fd.Close() - go func() { <-errs }() + <-errs continue } if err = <-errs; err == JobRepeatProcess { @@ -938,8 +1106,15 @@ func (ctx *Ctx) Toss( func (ctx *Ctx) AutoToss( nodeId *NodeId, nice uint8, - doSeen, noFile, noFreq, noExec, noTrns, noArea bool, + doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool, ) (chan struct{}, chan bool) { + dw, err := ctx.NewDirWatcher( + filepath.Join(ctx.Spool, nodeId.String(), string(TRx)), + time.Second, + ) + if err != nil { + log.Fatalln(err) + } finish := make(chan struct{}) badCode := make(chan bool) go func() { @@ -947,14 +1122,14 @@ func (ctx *Ctx) AutoToss( for { select { case <-finish: + dw.Close() badCode <- bad - break - default: + return + case <-dw.C: + bad = !ctx.Toss( + nodeId, TRx, nice, false, + doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK) || bad } - time.Sleep(time.Second) - bad = !ctx.Toss( - nodeId, TRx, nice, false, - doSeen, noFile, noFreq, noExec, noTrns, noArea) || bad } }() return finish, badCode