X-Git-Url: http://www.git.cypherpunks.ru/?p=nncp.git;a=blobdiff_plain;f=src%2Fcmd%2Fnncp-xfer%2Fmain.go;h=df7acd3b9cc3f0b2b881f4f85073ff51ec40e2b7;hp=c784b4e6bfade94a0105447ac95a595133f154d2;hb=1d2ce674b042d07fd9b37a46578c8b62bb0345b7;hpb=b2e36aeaa5dc1c9649c6895d938208d92ddc3fa3 diff --git a/src/cmd/nncp-xfer/main.go b/src/cmd/nncp-xfer/main.go index c784b4e..df7acd3 100644 --- a/src/cmd/nncp-xfer/main.go +++ b/src/cmd/nncp-xfer/main.go @@ -20,13 +20,13 @@ package main import ( "bufio" + "errors" "flag" "fmt" "io" "log" "os" "path/filepath" - "strconv" xdr "github.com/davecgh/go-xdr/xdr2" "go.cypherpunks.ru/nncp/v5" @@ -51,6 +51,8 @@ func main() { spoolPath = flag.String("spool", "", "Override path to spool") logPath = flag.String("log", "", "Override path to logfile") quiet = flag.Bool("quiet", false, "Print only errors") + showPrgrs = flag.Bool("progress", false, "Force progress showing") + omitPrgrs = flag.Bool("noprogress", false, "Omit progress showing") debug = flag.Bool("debug", false, "Print debug messages") version = flag.Bool("version", false, "Print version information") warranty = flag.Bool("warranty", false, "Print warranty information") @@ -77,7 +79,15 @@ func main() { log.Fatalln("-rx and -tx can not be set simultaneously") } - ctx, err := nncp.CtxFromCmdline(*cfgPath, *spoolPath, *logPath, *quiet, *debug) + ctx, err := nncp.CtxFromCmdline( + *cfgPath, + *spoolPath, + *logPath, + *quiet, + *showPrgrs, + *omitPrgrs, + *debug, + ) if err != nil { log.Fatalln("Error during initialization:", err) } @@ -107,20 +117,20 @@ func main() { ctx.LogD("nncp-xfer", sds, "no dir") goto Tx } - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "stat") + ctx.LogE("nncp-xfer", sds, err, "stat") isBad = true goto Tx } dir, err = os.Open(selfPath) if err != nil { - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "open") + ctx.LogE("nncp-xfer", sds, err, "open") isBad = true goto Tx } fis, err = dir.Readdir(0) dir.Close() if err != nil { - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "read") + ctx.LogE("nncp-xfer", sds, err, "read") isBad = true goto Tx } @@ -144,14 +154,14 @@ func main() { } dir, err = os.Open(filepath.Join(selfPath, fi.Name())) if err != nil { - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "open") + ctx.LogE("nncp-xfer", sds, err, "open") isBad = true continue } fisInt, err := dir.Readdir(0) dir.Close() if err != nil { - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "read") + ctx.LogE("nncp-xfer", sds, err, "read") isBad = true continue } @@ -159,12 +169,16 @@ func main() { if !fi.IsDir() { continue } + // Check that it is valid Base32 encoding + if _, err = nncp.NodeIdFromString(fiInt.Name()); err != nil { + continue + } filename := filepath.Join(dir.Name(), fiInt.Name()) sds["file"] = filename delete(sds, "size") fd, err := os.Open(filename) if err != nil { - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "open") + ctx.LogE("nncp-xfer", sds, err, "open") isBad = true continue } @@ -180,9 +194,9 @@ func main() { fd.Close() continue } - sds["size"] = strconv.FormatInt(fiInt.Size(), 10) + sds["size"] = fiInt.Size() if !ctx.IsEnoughSpace(fiInt.Size()) { - ctx.LogE("nncp-xfer", sds, "is not enough space") + ctx.LogE("nncp-xfer", sds, errors.New("is not enough space"), "") fd.Close() continue } @@ -191,14 +205,28 @@ func main() { if err != nil { log.Fatalln(err) } - if _, err = io.CopyN(tmp.W, bufio.NewReader(fd), fiInt.Size()); err != nil { - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "copy") + r, w := io.Pipe() + go func() { + _, err := io.CopyN(w, bufio.NewReader(fd), fiInt.Size()) + if err == nil { + w.Close() + return + } + ctx.LogE("nncp-xfer", sds, err, "copy") + w.CloseWithError(err) + }() + if _, err = nncp.CopyProgressed(tmp.W, r, nncp.SdsAdd(sds, nncp.SDS{ + "pkt": filename, + "fullsize": sds["size"], + }), ctx.ShowPrgrs); err != nil { + ctx.LogE("nncp-xfer", sds, err, "copy") isBad = true - fd.Close() + } + fd.Close() + if isBad { tmp.Cancel() continue } - fd.Close() if err = tmp.Commit(filepath.Join( ctx.Spool, nodeId.String(), @@ -209,7 +237,7 @@ func main() { ctx.LogI("nncp-xfer", sds, "") if !*keep { if err = os.Remove(filename); err != nil { - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "remove") + ctx.LogE("nncp-xfer", sds, err, "remove") isBad = true } } @@ -246,13 +274,13 @@ Tx: } if err = os.Mkdir(nodePath, os.FileMode(0777)); err != nil { ctx.UnlockDir(dirLock) - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "mkdir") + ctx.LogE("nncp-xfer", sds, err, "mkdir") isBad = true continue } } else { ctx.UnlockDir(dirLock) - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "stat") + ctx.LogE("nncp-xfer", sds, err, "stat") isBad = true continue } @@ -264,13 +292,13 @@ Tx: if os.IsNotExist(err) { if err = os.Mkdir(dstPath, os.FileMode(0777)); err != nil { ctx.UnlockDir(dirLock) - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "mkdir") + ctx.LogE("nncp-xfer", sds, err, "mkdir") isBad = true continue } } else { ctx.UnlockDir(dirLock) - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "stat") + ctx.LogE("nncp-xfer", sds, err, "stat") isBad = true continue } @@ -296,7 +324,7 @@ Tx: } tmp, err := nncp.TempFile(dstPath, "xfer") if err != nil { - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "mktemp") + ctx.LogE("nncp-xfer", sds, err, "mktemp") job.Fd.Close() isBad = true break @@ -304,45 +332,48 @@ Tx: sds["tmp"] = tmp.Name() ctx.LogD("nncp-xfer", sds, "created") bufW := bufio.NewWriter(tmp) - copied, err := io.Copy(bufW, bufio.NewReader(job.Fd)) + copied, err := nncp.CopyProgressed( + bufW, + bufio.NewReader(job.Fd), + nncp.SdsAdd(sds, nncp.SDS{"fullsize": job.Size}), + ctx.ShowPrgrs, + ) job.Fd.Close() if err != nil { - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "copy") + ctx.LogE("nncp-xfer", sds, err, "copy") tmp.Close() isBad = true continue } if err = bufW.Flush(); err != nil { tmp.Close() - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "flush") + ctx.LogE("nncp-xfer", sds, err, "flush") isBad = true continue } if err = tmp.Sync(); err != nil { tmp.Close() - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "sync") + ctx.LogE("nncp-xfer", sds, err, "sync") isBad = true continue } tmp.Close() if err = os.Rename(tmp.Name(), filepath.Join(dstPath, pktName)); err != nil { - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "rename") + ctx.LogE("nncp-xfer", sds, err, "rename") isBad = true continue } if err = nncp.DirSync(dstPath); err != nil { - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "sync") + ctx.LogE("nncp-xfer", sds, err, "sync") isBad = true continue } os.Remove(filepath.Join(dstPath, pktName+".part")) delete(sds, "tmp") - ctx.LogI("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{ - "size": strconv.FormatInt(copied, 10), - }), "") + ctx.LogI("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"size": copied}), "") if !*keep { if err = os.Remove(job.Fd.Name()); err != nil { - ctx.LogE("nncp-xfer", nncp.SdsAdd(sds, nncp.SDS{"err": err}), "remove") + ctx.LogE("nncp-xfer", sds, err, "remove") isBad = true } }