X-Git-Url: http://www.git.cypherpunks.ru/?a=blobdiff_plain;f=src%2Ftoss.go;h=b4363f713ea7856a16df74e26ff5a10ae0413ae6;hb=72f42f8288689ae89e021ba33ebca8d551a784d7;hp=e383257db77c43e31ddd8127160276526376983e;hpb=891cee4997bb0a269d4b2f35311bab104bdc3283;p=nncp.git diff --git a/src/toss.go b/src/toss.go index e383257..b4363f7 100644 --- a/src/toss.go +++ b/src/toss.go @@ -1,6 +1,6 @@ /* NNCP -- Node to Node copy, utilities for store-and-forward data exchange -Copyright (C) 2016-2022 Sergey Matveev +Copyright (C) 2016-2023 Sergey Matveev This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -24,7 +24,7 @@ import ( "errors" "fmt" "io" - "io/ioutil" + "io/fs" "log" "mime" "os" @@ -91,7 +91,7 @@ func jobProcess( pktSize uint64, jobPath string, decompressor *zstd.Decoder, - dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool, + dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool, ) error { defer pipeR.Close() sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"] @@ -357,7 +357,7 @@ func jobProcess( dstPathCtr := 0 for { if _, err = os.Stat(dstPath); err != nil { - if os.IsNotExist(err) { + if errors.Is(err, fs.ErrNotExist) { break } ctx.LogE("rx-stat", les, err, func(les LEs) string { @@ -475,7 +475,7 @@ func jobProcess( ) return err } - dstRaw, err := ioutil.ReadAll(pipeR) + dstRaw, err := io.ReadAll(pipeR) if err != nil { ctx.LogE("rx-read", les, err, func(les LEs) string { return fmt.Sprintf( @@ -621,7 +621,7 @@ func jobProcess( if err != nil { panic(err) } - if _, _, err = ctx.Tx( + if _, _, _, err = ctx.Tx( node, pktTrns, nice, @@ -752,7 +752,7 @@ func jobProcess( } if nodeId != sender.Id && nodeId != pktEnc.Sender { ctx.LogI("rx-area-echo", lesEcho, logMsgNode) - if _, _, err = ctx.Tx( + if _, _, _, err = ctx.Tx( node, &pkt, nice, @@ -858,7 +858,7 @@ func jobProcess( uint64(pktSizeWithoutEnc(int64(pktSize))), "", decompressor, - dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, + dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK, ) }() _, _, _, err = PktEncRead( @@ -908,6 +908,66 @@ func jobProcess( } } + case PktTypeACK: + if noACK { + return nil + } + hsh := Base32Codec.EncodeToString(pkt.Path[:MTHSize]) + les := append(les, LE{"Type", "ack"}, LE{"Pkt", hsh}) + logMsg := func(les LEs) string { + return fmt.Sprintf("Tossing ack %s/%s: %s", sender.Name, pktName, hsh) + } + ctx.LogD("rx-ack", les, logMsg) + pktPath := filepath.Join(ctx.Spool, sender.Id.String(), string(TTx), hsh) + if _, err := os.Stat(pktPath); err == nil { + if !dryRun { + if err = os.Remove(pktPath); err != nil { + ctx.LogE("rx-ack", les, err, func(les LEs) string { + return logMsg(les) + ": removing packet" + }) + return err + } else if ctx.HdrUsage { + os.Remove(JobPath2Hdr(pktPath)) + } + } + } else { + ctx.LogD("rx-ack", les, func(les LEs) string { + return logMsg(les) + ": already disappeared" + }) + } + if !dryRun && doSeen { + if err := ensureDir(filepath.Dir(jobPath), SeenDir); err != nil { + return err + } + if fd, err := os.Create(jobPath2Seen(jobPath)); err == nil { + fd.Close() + if err = DirSync(filepath.Dir(jobPath)); err != nil { + ctx.LogE("rx-dirsync", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: dirsyncing", + sender.Name, pktName, + humanize.IBytes(pktSize), + filepath.Base(jobPath), + ) + }) + return err + } + } + } + if !dryRun { + if err = os.Remove(jobPath); err != nil { + ctx.LogE("rx", les, err, func(les LEs) string { + return logMsg(les) + ": removing job" + }) + return err + } else if ctx.HdrUsage { + os.Remove(JobPath2Hdr(jobPath)) + } + } + ctx.LogI("rx", les, func(les LEs) string { + return fmt.Sprintf("Got ACK packet from %s of %s", sender.Name, hsh) + }) + default: err = errors.New("unknown type") ctx.LogE( @@ -928,7 +988,7 @@ func (ctx *Ctx) Toss( nodeId *NodeId, xx TRxTx, nice uint8, - dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea bool, + dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool, ) bool { dirLock, err := ctx.LockDir(nodeId, "toss") if err != nil { @@ -996,14 +1056,14 @@ func (ctx *Ctx) Toss( uint64(pktSizeWithoutEnc(job.Size)), job.Path, decompressor, - dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, + dryRun, doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK, ) }() pipeWB := bufio.NewWriter(pipeW) sharedKey, _, _, err = PktEncRead( ctx.Self, ctx.Neigh, - bufio.NewReader(fd), + bufio.NewReaderSize(fd, MTHBlockSize), pipeWB, sharedKey == nil, sharedKey, @@ -1046,7 +1106,7 @@ func (ctx *Ctx) Toss( func (ctx *Ctx) AutoToss( nodeId *NodeId, nice uint8, - doSeen, noFile, noFreq, noExec, noTrns, noArea bool, + doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK bool, ) (chan struct{}, chan bool) { dw, err := ctx.NewDirWatcher( filepath.Join(ctx.Spool, nodeId.String(), string(TRx)), @@ -1068,7 +1128,7 @@ func (ctx *Ctx) AutoToss( case <-dw.C: bad = !ctx.Toss( nodeId, TRx, nice, false, - doSeen, noFile, noFreq, noExec, noTrns, noArea) || bad + doSeen, noFile, noFreq, noExec, noTrns, noArea, noACK) || bad } } }()