From: Sergey Matveev Date: Sat, 20 Feb 2021 16:42:41 +0000 (+0300) Subject: .hdr files X-Git-Tag: v6.1.0^2~1 X-Git-Url: http://www.git.cypherpunks.ru/?p=nncp.git;a=commitdiff_plain;h=785dbe18183ba25a478a50c1d96fe263b5ab43f2 .hdr files --- diff --git a/doc/cfg.texi b/doc/cfg.texi index 3e0b882..cc6a24a 100644 --- a/doc/cfg.texi +++ b/doc/cfg.texi @@ -9,6 +9,7 @@ Example @url{https://hjson.org/, Hjson} configuration file: log: /var/spool/nncp/log umask: "022" noprogress: true + nohdr: true notify: { file: { @@ -103,6 +104,9 @@ Enabled @strong{noprogress} option disabled progress showing for many commands by default. You can always force its showing with @option{-progress} command line option anyway. +@anchor{CfgNoHdr} +@strong{nohdr} option disables @ref{HdrFile, .hdr} files usage. + @anchor{CfgNotify} @strong{notify} section contains notification settings for successfully tossed file, freq and exec packets. Corresponding @strong{from} and diff --git a/doc/news.ru.texi b/doc/news.ru.texi index 34d39d8..9e9c669 100644 --- a/doc/news.ru.texi +++ b/doc/news.ru.texi @@ -24,6 +24,11 @@ online демоны не выполнят проверку целостност Оптимизация: для файлов, скачивание которых не было продолжено, сразу же вычисляет контрольная сумма, пропуская промежуточный @file{.nock} шаг. +@item +Возможность хранения заголовков зашифрованных пакетов в @file{.hdr} +файлах, рядом с самими пакетами. Это может существенно повысить скорость +получения списка пакетов на файловых системах с большим размером блока. + @end itemize @node Релиз 6.0.0 diff --git a/doc/news.texi b/doc/news.texi index eb4fbbe..6be8466 100644 --- a/doc/news.texi +++ b/doc/news.texi @@ -25,6 +25,11 @@ perform integrity check. Optimization: files, that are not resumed, are checksummed immediately during the online download, skipping @file{.nock}-intermediate step. +@item +Ability to store encrypted packet's header in @file{.hdr} file, close to +the packet itself. That can greatly increase performance of packets +listing on filesystems with big block's size. + @end itemize @node Release 6.0.0 diff --git a/doc/spool.texi b/doc/spool.texi index aea7976..8dcaa3e 100644 --- a/doc/spool.texi +++ b/doc/spool.texi @@ -3,52 +3,78 @@ Spool directory holds @ref{Encrypted, encrypted packets} received from remote nodes and queued for sending to them. It has the following -example structure: +example structure with just single outbound (@code{tx}) packet +@code{LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ} to the node +@code{2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ}: @example -spool/tmp/ -spool/2WHB...OABQ/rx.lock -spool/2WHB...OABQ/rx/5ZIB...UMKW.part -spool/2WHB...OABQ/tx.lock -spool/2WHB...OABQ/toss.lock -spool/BYRR...CG6Q/rx.lock -spool/BYRR...CG6Q/rx/MLZ6...Q3SQ.nock -spool/BYRR...CG6Q/rx/ -spool/BYRR...CG6Q/tx.lock -spool/BYRR...CG6Q/tx/AQUT...DGNT.seen -spool/BYRR...CG6Q/tx/NSYY...ZUU6 -spool/BYRR...CG6Q/tx/VCSR...3VXX.seen -spool/BYRR...CG6Q/tx/ZI5U...5RRQ +spool/2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ/toss.lock +spool/2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ/rx.lock +spool/2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ/rx/ +spool/2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ/tx.lock +spool/2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ/tx/LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ +spool/tmp @end example -@itemize +@table @file -@item Except for @file{tmp}, all other directories are Base32-encoded -node identifiers (@file{2WHB...OABQ}, @file{BYRR...CG6Q} in our example). +@item tmp +directory contains various temporary files that under normal +circumstances are renamed to necessary files inside other directories. +All directories in @file{spool} @strong{have to} be on the same +filesystem for working renaming. -@item Each node subdirectory has @file{rx} (received, partially received -and currently unprocessed packets) and @file{tx} (for outbound packets) -directories. +@item 2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ +is an example Base32-encoded neighbour identifier. -@item Each @file{rx}/@file{tx} directory contains one file per encrypted -packet. Its filename is Base32 encoded BLAKE2b hash of the contents. So -it can be integrity checked at any time. +@item rx, tx +directories are for incoming and outgoing encrypted packets. @file{rx} +contains currently unfinished, non-checked, unprocessed, etc packets. -@item @file{5ZIB...UMKW.part} is partially received file from -@file{2WHB...OABQ} node. @file{tx} directory can not contain partially -written files -- they are moved atomically from @file{tmp}. +@item toss.lock, rx.lock, tx.lock +Lock files. Only single process can work with @file{rx}/@file{tx} +directories at once. -@item @file{rx} can contain received, but currently integrity unchecked -files with @file{.nock} extension. It is completely the same as an -ordinary encrypted packets, but its integrity after online download was -not done. After successful checksum verification, @file{.nock} extension -is trimmed. +@item LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ +is an example @ref{Encrypted, encrypted packet}. Its filename is Base32 +encoded BLAKE2b hash of the whole contents. It can be integrity checked +anytime. -@item When @ref{nncp-toss} utility is called with @option{-seen} option, -it will create empty @file{XXX.seen} files, telling that some kind of -packet was already tossed sometime. +@item LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ.part +is an example @strong{partly} received file. It can appear only when +online transfer is used. Its filename is sent by remote side and until +file is fully downloaded -- it plays no role. -@item Only one process can work with @file{rx}/@file{tx} directories at -once, so there are corresponding lock files. +@item LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ.nock +non-checksummed (NoCK) @strong{fully} received file. Its checksum is +verified against its filename either by @ref{nncp-check}, or by working +online daemons. If it is correct, then its extension is trimmed. -@end itemize +@item LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ.seen +@ref{nncp-toss} utility can be invoked with @option{-seen} option, +leading to creation of @file{.seen} files, telling that the file with +specified hash has already been processed before. It could be useful +when there are use-cases where multiple ways of packets transfer +available and there is possibility of duplicates reception. You have to +manually remove them, when you do not need them (probably because they +are expired). + +@anchor{HdrFile} +@item LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ.hdr +If no @ref{CfgNoHdr, nohdr} option is enabled in configuration file, +then @file{.hdr} files are automatically created for every ordinary +(fully received and checksummed) packet. It literally contains just the +header of the corresponding packet. It will be automatically created +even during simple @ref{nncp-stat} call. On filesystems with big +blocksize (ZFS for example) it can greatly help listing the packets in +directories, because it prevents unnecessary read-amplification. On +other filesystems probably it won't help at all, or even harm +performance. + +There is a hack: you can create more dense @file{.hdr} allocation by +removing all @file{.hdr} files and then running @command{nncp-stat}, +that will recreate them. In many cases many @file{.hdr} files will be +allocated more or less linearly on the disk, decreasing listing time +even more. + +@end table diff --git a/src/cfg.go b/src/cfg.go index 9d94953..6e781d1 100644 --- a/src/cfg.go +++ b/src/cfg.go @@ -119,6 +119,7 @@ type CfgJSON struct { Umask string `json:"umask,omitempty"` OmitPrgrs bool `json:"noprogress,omitempty"` + NoHdr bool `json:"nohdr,omitempty"` Notify *NotifyJSON `json:"notify,omitempty"` @@ -463,11 +464,16 @@ func CfgParse(data []byte) (*Ctx, error) { if cfgJSON.OmitPrgrs { showPrgrs = false } + hdrUsage := true + if cfgJSON.NoHdr { + hdrUsage = false + } ctx := Ctx{ Spool: spoolPath, LogPath: logPath, UmaskForce: umaskForce, ShowPrgrs: showPrgrs, + HdrUsage: hdrUsage, Self: self, Neigh: make(map[NodeId]*Node, len(cfgJSON.Neigh)), Alias: make(map[string]*NodeId), diff --git a/src/check.go b/src/check.go index 2569e6a..ac26e9c 100644 --- a/src/check.go +++ b/src/check.go @@ -82,6 +82,7 @@ func (ctx *Ctx) CheckNoCK(nodeId *NodeId, hshValue *[32]byte) (int64, error) { if err != nil { return 0, err } + defer fd.Close() fi, err := fd.Stat() if err != nil { return 0, err @@ -101,5 +102,18 @@ func (ctx *Ctx) CheckNoCK(nodeId *NodeId, hshValue *[32]byte) (int64, error) { if err = os.Rename(pktPath+NoCKSuffix, pktPath); err != nil { return 0, err } - return size, DirSync(dirToSync) + if err = DirSync(dirToSync); err != nil { + return size, err + } + if ctx.HdrUsage { + if _, err = fd.Seek(0, io.SeekStart); err != nil { + return size, err + } + _, pktEncRaw, err := ctx.HdrRead(fd) + if err != nil { + return size, err + } + ctx.HdrWrite(pktEncRaw, pktPath) + } + return size, err } diff --git a/src/cmd/nncp-bundle/main.go b/src/cmd/nncp-bundle/main.go index 106f820..60a9330 100644 --- a/src/cmd/nncp-bundle/main.go +++ b/src/cmd/nncp-bundle/main.go @@ -179,6 +179,8 @@ func main() { if *doDelete { if err = os.Remove(job.Path); err != nil { log.Fatalln("Error during deletion:", err) + } else if ctx.HdrUsage { + os.Remove(job.Path + nncp.HdrSuffix) } } ctx.LogI("nncp-bundle", append(les, nncp.LE{K: "Size", V: job.Size}), "") @@ -303,7 +305,10 @@ func main() { if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) == pktName { ctx.LogI("nncp-bundle", les, "removed") if !*dryRun { - os.Remove(dstPath) // #nosec G104 + os.Remove(dstPath) + if ctx.HdrUsage { + os.Remove(dstPath + nncp.HdrSuffix) + } } } else { ctx.LogE("nncp-bundle", les, errors.New("bad checksum"), "") @@ -412,6 +417,9 @@ func main() { if err = nncp.DirSync(dstDirPath); err != nil { log.Fatalln("Error during syncing:", err) } + if ctx.HdrUsage { + ctx.HdrWrite(pktEncBuf, dstPath) + } } } for _, le := range les { diff --git a/src/cmd/nncp-cfgnew/main.go b/src/cmd/nncp-cfgnew/main.go index 08c4875..f511c35 100644 --- a/src/cmd/nncp-cfgnew/main.go +++ b/src/cmd/nncp-cfgnew/main.go @@ -104,6 +104,8 @@ func main() { # umask: "022" # Omit progress showing by default # noprogress: true + # Do not use .hdr files + # nohdr: true # Enable notification email sending # notify: { diff --git a/src/cmd/nncp-rm/main.go b/src/cmd/nncp-rm/main.go index 0802f51..b036d70 100644 --- a/src/cmd/nncp-rm/main.go +++ b/src/cmd/nncp-rm/main.go @@ -40,6 +40,7 @@ func usage() { fmt.Fprintf(os.Stderr, " %s [options] -node NODE -part\n", os.Args[0]) fmt.Fprintf(os.Stderr, " %s [options] -node NODE -seen\n", os.Args[0]) fmt.Fprintf(os.Stderr, " %s [options] -node NODE -nock\n", os.Args[0]) + fmt.Fprintf(os.Stderr, " %s [options] -node NODE -hdr\n", os.Args[0]) fmt.Fprintf(os.Stderr, " %s [options] -node NODE {-rx|-tx}\n", os.Args[0]) fmt.Fprintf(os.Stderr, " %s [options] -node NODE -pkt PKT\n", os.Args[0]) fmt.Fprintln(os.Stderr, "-older option's time units are: (s)econds, (m)inutes, (h)ours, (d)ays") @@ -51,6 +52,7 @@ func main() { var ( cfgPath = flag.String("cfg", nncp.DefaultCfgPath, "Path to configuration file") doTmp = flag.Bool("tmp", false, "Remove all temporary files") + doHdr = flag.Bool("hdr", false, "Remove all .hdr files") doLock = flag.Bool("lock", false, "Remove all lock files") nodeRaw = flag.String("node", "", "Node to remove files in") doRx = flag.Bool("rx", false, "Process received packets") @@ -178,21 +180,10 @@ func main() { ctx.LogD("nncp-rm", nncp.LEs{{K: "File", V: path}}, "too fresh, skipping") return nil } - if *doSeen && strings.HasSuffix(info.Name(), nncp.SeenSuffix) { - ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "") - if *dryRun { - return nil - } - return os.Remove(path) - } - if *doNoCK && strings.HasSuffix(info.Name(), nncp.NoCKSuffix) { - ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "") - if *dryRun { - return nil - } - return os.Remove(path) - } - if *doPart && strings.HasSuffix(info.Name(), nncp.PartSuffix) { + if (*doSeen && strings.HasSuffix(info.Name(), nncp.SeenSuffix)) || + (*doNoCK && strings.HasSuffix(info.Name(), nncp.NoCKSuffix)) || + (*doHdr && strings.HasSuffix(info.Name(), nncp.HdrSuffix)) || + (*doPart && strings.HasSuffix(info.Name(), nncp.PartSuffix)) { ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "") if *dryRun { return nil @@ -206,9 +197,7 @@ func main() { } return os.Remove(path) } - if !*doSeen && - !*doNoCK && - !*doPart && + if !*doSeen && !*doNoCK && !*doHdr && !*doPart && (*doRx || *doTx) && ((*doRx && xx == nncp.TRx) || (*doTx && xx == nncp.TTx)) { ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "") @@ -220,12 +209,12 @@ func main() { return nil }) } - if *pktRaw != "" || *doRx || *doSeen || *doNoCK || *doPart { + if *pktRaw != "" || *doRx || *doSeen || *doNoCK || *doHdr || *doPart { if err = remove(nncp.TRx); err != nil { log.Fatalln("Can not remove:", err) } } - if *pktRaw != "" || *doTx { + if *pktRaw != "" || *doTx || *doHdr { if err = remove(nncp.TTx); err != nil { log.Fatalln("Can not remove:", err) } diff --git a/src/cmd/nncp-xfer/main.go b/src/cmd/nncp-xfer/main.go index b975467..093f108 100644 --- a/src/cmd/nncp-xfer/main.go +++ b/src/cmd/nncp-xfer/main.go @@ -28,7 +28,6 @@ import ( "os" "path/filepath" - xdr "github.com/davecgh/go-xdr/xdr2" "go.cypherpunks.ru/nncp/v5" ) @@ -183,8 +182,7 @@ func main() { isBad = true continue } - var pktEnc nncp.PktEnc - _, err = xdr.Unmarshal(fd, &pktEnc) + pktEnc, pktEncRaw, err := ctx.HdrRead(fd) if err != nil || pktEnc.Magic != nncp.MagicNNCPEv4 { ctx.LogD("nncp-xfer", les, "is not a packet") fd.Close() // #nosec G104 @@ -249,6 +247,14 @@ func main() { isBad = true } } + if ctx.HdrUsage { + ctx.HdrWrite(pktEncRaw, filepath.Join( + ctx.Spool, + nodeId.String(), + string(nncp.TRx), + tmp.Checksum(), + )) + } } } @@ -389,6 +395,8 @@ Tx: if err = os.Remove(job.Path); err != nil { ctx.LogE("nncp-xfer", les, err, "remove") isBad = true + } else if ctx.HdrUsage { + os.Remove(job.Path + nncp.HdrSuffix) } } } diff --git a/src/ctx.go b/src/ctx.go index ebd3102..d838828 100644 --- a/src/ctx.go +++ b/src/ctx.go @@ -40,6 +40,7 @@ type Ctx struct { UmaskForce *int Quiet bool ShowPrgrs bool + HdrUsage bool Debug bool NotifyFile *FromToJSON NotifyFreq *FromToJSON diff --git a/src/jobs.go b/src/jobs.go index 3e97b2e..37a31b1 100644 --- a/src/jobs.go +++ b/src/jobs.go @@ -18,6 +18,7 @@ along with this program. If not, see . package nncp import ( + "bytes" "os" "path/filepath" "strings" @@ -30,6 +31,8 @@ type TRxTx string const ( TRx TRxTx = "rx" TTx TRxTx = "tx" + + HdrSuffix = ".hdr" ) type Job struct { @@ -39,6 +42,42 @@ type Job struct { HshValue *[32]byte } +func (ctx *Ctx) HdrRead(fd *os.File) (*PktEnc, []byte, error) { + var pktEnc PktEnc + _, err := xdr.Unmarshal(fd, &pktEnc) + if err != nil { + return nil, nil, err + } + var raw bytes.Buffer + if _, err = xdr.Marshal(&raw, pktEnc); err != nil { + panic(err) + } + return &pktEnc, raw.Bytes(), nil +} + +func (ctx *Ctx) HdrWrite(pktEncRaw []byte, tgt string) error { + tmpHdr, err := ctx.NewTmpFile() + if err != nil { + ctx.LogE("hdr-write", []LE{}, err, "new") + return err + } + if _, err = tmpHdr.Write(pktEncRaw); err != nil { + ctx.LogE("hdr-write", []LE{}, err, "write") + os.Remove(tmpHdr.Name()) + return err + } + if err = tmpHdr.Close(); err != nil { + ctx.LogE("hdr-write", []LE{}, err, "close") + os.Remove(tmpHdr.Name()) + return err + } + if err = os.Rename(tmpHdr.Name(), tgt+HdrSuffix); err != nil { + ctx.LogE("hdr-write", []LE{}, err, "rename") + return err + } + return err +} + func (ctx *Ctx) jobsFind(nodeId *NodeId, xx TRxTx, nock bool) chan Job { rxPath := filepath.Join(ctx.Spool, nodeId.String(), string(xx)) jobs := make(chan Job, 16) @@ -54,27 +93,41 @@ func (ctx *Ctx) jobsFind(nodeId *NodeId, xx TRxTx, nock bool) chan Job { return } for _, fi := range fis { + name := fi.Name() var hshValue []byte if nock { - if !strings.HasSuffix(fi.Name(), NoCKSuffix) { + if !strings.HasSuffix(name, NoCKSuffix) || + len(name) != Base32Encoded32Len+len(NoCKSuffix) { continue } hshValue, err = Base32Codec.DecodeString( - strings.TrimSuffix(fi.Name(), NoCKSuffix), + strings.TrimSuffix(name, NoCKSuffix), ) } else { - hshValue, err = Base32Codec.DecodeString(fi.Name()) + if len(name) != Base32Encoded32Len { + continue + } + hshValue, err = Base32Codec.DecodeString(name) } if err != nil { continue } - pth := filepath.Join(rxPath, fi.Name()) - fd, err := os.Open(pth) + pth := filepath.Join(rxPath, name) + hdrExists := true + var fd *os.File + if nock { + fd, err = os.Open(pth) + } else { + fd, err = os.Open(pth + HdrSuffix) + if err != nil && os.IsNotExist(err) { + hdrExists = false + fd, err = os.Open(pth) + } + } if err != nil { continue } - var pktEnc PktEnc - _, err = xdr.Unmarshal(fd, &pktEnc) + pktEnc, pktEncRaw, err := ctx.HdrRead(fd) fd.Close() if err != nil || pktEnc.Magic != MagicNNCPEv4 { continue @@ -82,12 +135,15 @@ func (ctx *Ctx) jobsFind(nodeId *NodeId, xx TRxTx, nock bool) chan Job { ctx.LogD("jobs", LEs{ {"XX", string(xx)}, {"Node", pktEnc.Sender}, - {"Name", fi.Name()}, + {"Name", name}, {"Nice", int(pktEnc.Nice)}, {"Size", fi.Size()}, }, "taken") + if !hdrExists && ctx.HdrUsage { + ctx.HdrWrite(pktEncRaw, pth) + } job := Job{ - PktEnc: &pktEnc, + PktEnc: pktEnc, Path: pth, Size: fi.Size(), HshValue: new([32]byte), diff --git a/src/nncp.go b/src/nncp.go index 5e44d5b..16e5b9a 100644 --- a/src/nncp.go +++ b/src/nncp.go @@ -37,6 +37,8 @@ You should have received a copy of the GNU General Public License along with this program. If not, see .` ) +const Base32Encoded32Len = 52 + var ( Version string = "6.1.0" diff --git a/src/pkt.go b/src/pkt.go index 4dea1bf..c2715e4 100644 --- a/src/pkt.go +++ b/src/pkt.go @@ -192,14 +192,14 @@ func PktEncWrite( size, padSize int64, data io.Reader, out io.Writer, -) error { +) ([]byte, error) { pubEph, prvEph, err := box.GenerateKey(rand.Reader) if err != nil { - return err + return nil, err } var pktBuf bytes.Buffer if _, err := xdr.Marshal(&pktBuf, pkt); err != nil { - return err + return nil, err } tbs := PktTbs{ Magic: MagicNNCPEv4, @@ -210,7 +210,7 @@ func PktEncWrite( } var tbsBuf bytes.Buffer if _, err = xdr.Marshal(&tbsBuf, &tbs); err != nil { - return err + return nil, err } signature := new([ed25519.SignatureSize]byte) copy(signature[:], ed25519.Sign(our.SignPrv, tbsBuf.Bytes())) @@ -222,26 +222,31 @@ func PktEncWrite( ExchPub: *pubEph, Sign: *signature, } - if _, err = xdr.Marshal(out, &pktEnc); err != nil { - return err + tbsBuf.Reset() + if _, err = xdr.Marshal(&tbsBuf, &pktEnc); err != nil { + return nil, err + } + pktEncRaw := tbsBuf.Bytes() + if _, err = out.Write(pktEncRaw); err != nil { + return nil, err } sharedKey := new([32]byte) curve25519.ScalarMult(sharedKey, prvEph, their.ExchPub) kdf, err := blake2b.NewXOF(KDFXOFSize, sharedKey[:]) if err != nil { - return err + return nil, err } if _, err = kdf.Write(MagicNNCPEv4[:]); err != nil { - return err + return nil, err } key := make([]byte, chacha20poly1305.KeySize) if _, err = io.ReadFull(kdf, key); err != nil { - return err + return nil, err } aead, err := chacha20poly1305.New(key) if err != nil { - return err + return nil, err } nonce := make([]byte, aead.NonceSize()) @@ -249,31 +254,31 @@ func PktEncWrite( sizeBuf := make([]byte, 8+aead.Overhead()) binary.BigEndian.PutUint64(sizeBuf, uint64(sizeWithTags(int64(fullSize)))) if _, err = out.Write(aead.Seal(sizeBuf[:0], nonce, sizeBuf[:8], nil)); err != nil { - return err + return nil, err } lr := io.LimitedReader{R: data, N: size} mr := io.MultiReader(&pktBuf, &lr) written, err := aeadProcess(aead, nonce, true, mr, out) if err != nil { - return err + return nil, err } if written != fullSize { - return io.ErrUnexpectedEOF + return nil, io.ErrUnexpectedEOF } if padSize > 0 { if _, err = io.ReadFull(kdf, key); err != nil { - return err + return nil, err } kdf, err = blake2b.NewXOF(blake2b.OutputLengthUnknown, key) if err != nil { - return err + return nil, err } if _, err = io.CopyN(out, kdf, padSize); err != nil { - return err + return nil, err } } - return nil + return pktEncRaw, nil } func TbsVerify(our *NodeOur, their *Node, pktEnc *PktEnc) (bool, error) { diff --git a/src/pkt_test.go b/src/pkt_test.go index cbf7818..079acbd 100644 --- a/src/pkt_test.go +++ b/src/pkt_test.go @@ -44,7 +44,7 @@ func TestPktEncWrite(t *testing.T) { if err != nil { panic(err) } - err = PktEncWrite( + _, err = PktEncWrite( nodeOur, nodeTheir.Their(), pkt, @@ -95,7 +95,7 @@ func TestPktEncRead(t *testing.T) { if err != nil { panic(err) } - err = PktEncWrite( + _, err = PktEncWrite( node1, node2.Their(), pkt, diff --git a/src/progress.go b/src/progress.go index 62e0f01..ca1f361 100644 --- a/src/progress.go +++ b/src/progress.go @@ -141,7 +141,7 @@ func Progress(prefix string, les LEs) { progressBarsLock.Unlock() } what := pkt - if len(what) >= 52 { // Base32 encoded + if len(what) >= Base32Encoded32Len { // Base32 encoded what = what[:16] + ".." + what[len(what)-16:] } what = prefix + " " + what diff --git a/src/sp.go b/src/sp.go index 3ff47ef..e120769 100644 --- a/src/sp.go +++ b/src/sp.go @@ -1107,9 +1107,9 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { continue } err = fd.Sync() - state.closeFd(filePathPart) if err != nil { state.Ctx.LogE("sp-file", lesp, err, "sync") + state.closeFd(filePathPart) continue } if hasherExists { @@ -1134,8 +1134,25 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { state.Lock() delete(state.infosTheir, *file.Hash) state.Unlock() + if !state.Ctx.HdrUsage { + state.closeFd(filePathPart) + continue + } + if _, err = fd.Seek(0, io.SeekStart); err != nil { + state.Ctx.LogE("sp-file", lesp, err, "seek") + state.closeFd(filePathPart) + continue + } + _, pktEncRaw, err := state.Ctx.HdrRead(fd) + state.closeFd(filePathPart) + if err != nil { + state.Ctx.LogE("sp-file", lesp, err, "HdrRead") + continue + } + state.Ctx.HdrWrite(pktEncRaw, filePath) continue } + state.closeFd(filePathPart) if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil { state.Ctx.LogE("sp-file", lesp, err, "rename") continue @@ -1162,15 +1179,19 @@ func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) { } lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(done.Hash[:])}) state.Ctx.LogD("sp-done", lesp, "removing") - err := os.Remove(filepath.Join( + pth := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TTx), Base32Codec.EncodeToString(done.Hash[:]), - )) + ) + err := os.Remove(pth) lesp = append(lesp, LE{"XX", string(TTx)}) if err == nil { state.Ctx.LogI("sp-done", lesp, "") + if state.Ctx.HdrUsage { + os.Remove(pth + HdrSuffix) + } } else { state.Ctx.LogE("sp-done", lesp, err, "") } diff --git a/src/tmp.go b/src/tmp.go index e1e9127..b99ef3d 100644 --- a/src/tmp.go +++ b/src/tmp.go @@ -92,6 +92,10 @@ func DirSync(dirPath string) error { return fd.Close() } +func (tmp *TmpFileWHash) Checksum() string { + return Base32Codec.EncodeToString(tmp.Hsh.Sum(nil)) +} + func (tmp *TmpFileWHash) Commit(dir string) error { var err error if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil { @@ -108,7 +112,7 @@ func (tmp *TmpFileWHash) Commit(dir string) error { if err = tmp.Fd.Close(); err != nil { return err } - checksum := Base32Codec.EncodeToString(tmp.Hsh.Sum(nil)) + checksum := tmp.Checksum() tmp.ctx.LogD("tmp", LEs{{"Src", tmp.Fd.Name()}, {"Dst", checksum}}, "commit") if err = os.Rename(tmp.Fd.Name(), filepath.Join(dir, checksum)); err != nil { return err diff --git a/src/toss.go b/src/toss.go index 9916cc6..576abb1 100644 --- a/src/toss.go +++ b/src/toss.go @@ -204,6 +204,8 @@ func (ctx *Ctx) Toss( if err = os.Remove(job.Path); err != nil { ctx.LogE("rx", les, err, "remove") isBad = true + } else if ctx.HdrUsage { + os.Remove(job.Path + HdrSuffix) } } case PktTypeFile: @@ -300,6 +302,8 @@ func (ctx *Ctx) Toss( if err = os.Remove(job.Path); err != nil { ctx.LogE("rx", les, err, "remove") isBad = true + } else if ctx.HdrUsage { + os.Remove(job.Path + HdrSuffix) } if len(sendmail) > 0 && ctx.NotifyFile != nil { cmd := exec.Command( @@ -369,6 +373,8 @@ func (ctx *Ctx) Toss( if err = os.Remove(job.Path); err != nil { ctx.LogE("rx", les, err, "remove") isBad = true + } else if ctx.HdrUsage { + os.Remove(job.Path + HdrSuffix) } if len(sendmail) > 0 && ctx.NotifyFreq != nil { cmd := exec.Command( @@ -415,6 +421,8 @@ func (ctx *Ctx) Toss( if err = os.Remove(job.Path); err != nil { ctx.LogE("rx", les, err, "remove") isBad = true + } else if ctx.HdrUsage { + os.Remove(job.Path + HdrSuffix) } } default: diff --git a/src/toss_test.go b/src/toss_test.go index 3411382..807c49a 100644 --- a/src/toss_test.go +++ b/src/toss_test.go @@ -459,7 +459,7 @@ func TestTossTrns(t *testing.T) { } copy(pktTrans.Path[:], nodeOur.Id[:]) var dst bytes.Buffer - if err := PktEncWrite( + if _, err := PktEncWrite( ctx.Self, ctx.Neigh[*nodeOur.Id], &pktTrans, diff --git a/src/tx.go b/src/tx.go index c89957b..3e57f9b 100644 --- a/src/tx.go +++ b/src/tx.go @@ -62,7 +62,9 @@ func (ctx *Ctx) Tx( } expectedSize := size for i := 0; i < len(hops); i++ { - expectedSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+expectedSize) + expectedSize = PktEncOverhead + + PktSizeOverhead + + sizeWithTags(PktOverhead+expectedSize) } padSize := minSize - expectedSize if padSize < 0 { @@ -79,16 +81,23 @@ func (ctx *Ctx) Tx( errs := make(chan error) curSize := size pipeR, pipeW := io.Pipe() + var pktEncRaw []byte go func(size int64, src io.Reader, dst io.WriteCloser) { ctx.LogD("tx", LEs{ {"Node", hops[0].Id}, {"Nice", int(nice)}, {"Size", size}, }, "wrote") - errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst) + pktEncRaw, err = PktEncWrite( + ctx.Self, hops[0], pkt, nice, size, padSize, src, dst, + ) + errs <- err dst.Close() // #nosec G104 }(curSize, src, pipeW) - curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + padSize + curSize = PktEncOverhead + + PktSizeOverhead + + sizeWithTags(PktOverhead+curSize) + + padSize var pipeRPrev io.Reader for i := 1; i < len(hops); i++ { @@ -101,7 +110,8 @@ func (ctx *Ctx) Tx( {"Nice", int(nice)}, {"Size", size}, }, "trns wrote") - errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst) + _, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst) + errs <- err dst.Close() // #nosec G104 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW) curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) @@ -124,6 +134,12 @@ func (ctx *Ctx) Tx( nodePath := filepath.Join(ctx.Spool, lastNode.Id.String()) err = tmp.Commit(filepath.Join(nodePath, string(TTx))) os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) // #nosec G104 + if err != nil { + return lastNode, err + } + if ctx.HdrUsage { + ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum())) + } return lastNode, err }