2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
32 xdr "github.com/davecgh/go-xdr/xdr2"
33 "github.com/flynn/noise"
34 "golang.org/x/crypto/blake2b"
38 MaxSPSize = 1<<16 - 256
43 type SPCheckerQueues struct {
44 appeared chan *[32]byte
45 checked chan *[32]byte
49 MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
54 SPHaltMarshalized []byte
55 SPPingMarshalized []byte
57 NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
59 noise.CipherChaChaPoly,
63 DefaultDeadline = 10 * time.Second
64 PingTimeout = time.Minute
66 spCheckers = make(map[NodeId]*SPCheckerQueues)
69 type FdAndFullSize struct {
74 type HasherAndOffset struct {
82 SPTypeInfo SPType = iota
83 SPTypeFreq SPType = iota
84 SPTypeFile SPType = iota
85 SPTypeDone SPType = iota
86 SPTypeHalt SPType = iota
87 SPTypePing SPType = iota
120 type FreqWithNice struct {
125 type ConnDeadlined interface {
127 SetReadDeadline(t time.Time) error
128 SetWriteDeadline(t time.Time) error
133 spHead := SPHead{Type: SPTypeHalt}
134 if _, err := xdr.Marshal(&buf, spHead); err != nil {
137 SPHaltMarshalized = make([]byte, SPHeadOverhead)
138 copy(SPHaltMarshalized, buf.Bytes())
141 spHead = SPHead{Type: SPTypePing}
142 if _, err := xdr.Marshal(&buf, spHead); err != nil {
145 SPPingMarshalized = make([]byte, SPHeadOverhead)
146 copy(SPPingMarshalized, buf.Bytes())
149 spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
150 if _, err := xdr.Marshal(&buf, spInfo); err != nil {
153 SPInfoOverhead = buf.Len()
156 spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
157 if _, err := xdr.Marshal(&buf, spFreq); err != nil {
160 SPFreqOverhead = buf.Len()
163 spFile := SPFile{Hash: new([32]byte), Offset: 123}
164 if _, err := xdr.Marshal(&buf, spFile); err != nil {
167 SPFileOverhead = buf.Len()
170 func MarshalSP(typ SPType, sp interface{}) []byte {
172 if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
175 if _, err := xdr.Marshal(&buf, sp); err != nil {
181 func payloadsSplit(payloads [][]byte) [][]byte {
182 var outbounds [][]byte
183 outbound := make([]byte, 0, MaxSPSize)
184 for i, payload := range payloads {
185 outbound = append(outbound, payload...)
186 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
187 outbounds = append(outbounds, outbound)
188 outbound = make([]byte, 0, MaxSPSize)
191 if len(outbound) > 0 {
192 outbounds = append(outbounds, outbound)
197 type SPState struct {
202 onlineDeadline time.Duration
203 maxOnlineTime time.Duration
204 hs *noise.HandshakeState
205 csOur *noise.CipherState
206 csTheir *noise.CipherState
209 infosTheir map[[32]byte]*SPInfo
210 infosOurSeen map[[32]byte]uint8
211 queueTheir []*FreqWithNice
215 RxLastNonPing time.Time
218 TxLastNonPing time.Time
220 mustFinishAt time.Time
221 Duration time.Duration
231 onlyPkts map[[32]byte]bool
232 writeSPBuf bytes.Buffer
233 fds map[string]FdAndFullSize
234 fileHashers map[string]*HasherAndOffset
235 checkerQueues SPCheckerQueues
239 func (state *SPState) SetDead() {
244 // Already closed channel, dead
250 for range state.payloads {
254 for range state.pings {
258 for _, s := range state.fds {
264 func (state *SPState) NotAlive() bool {
273 func (state *SPState) dirUnlock() {
274 state.Ctx.UnlockDir(state.rxLock)
275 state.Ctx.UnlockDir(state.txLock)
278 func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) {
279 for hshValue := range appeared {
283 {"Pkt", Base32Codec.EncodeToString(hshValue[:])},
285 ctx.LogD("sp-checker", les, "checking")
286 size, err := ctx.CheckNoCK(nodeId, hshValue)
287 les = append(les, LE{"Size", size})
289 ctx.LogE("sp-checker", les, err, "")
292 ctx.LogI("sp-done", les, "")
293 go func(hsh *[32]byte) { checked <- hsh }(hshValue)
297 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
298 state.writeSPBuf.Reset()
299 n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
306 if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
307 state.TxLastSeen = time.Now()
308 state.TxBytes += int64(n)
310 state.TxLastNonPing = state.TxLastSeen
316 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
318 n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
320 ue := err.(*xdr.UnmarshalError)
321 if ue.Err == io.EOF {
326 state.RxLastSeen = time.Now()
327 state.RxBytes += int64(n)
328 if sp.Magic != MagicNNCPLv1 {
331 return sp.Payload, nil
334 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
337 for job := range ctx.Jobs(nodeId, TTx) {
338 if job.PktEnc.Nice > nice {
341 if _, known := (*seen)[*job.HshValue]; known {
344 totalSize += job.Size
345 infos = append(infos, &SPInfo{
346 Nice: job.PktEnc.Nice,
347 Size: uint64(job.Size),
350 (*seen)[*job.HshValue] = job.PktEnc.Nice
352 sort.Sort(ByNice(infos))
353 var payloads [][]byte
354 for _, info := range infos {
355 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
356 ctx.LogD("sp-info-our", LEs{
358 {"Name", Base32Codec.EncodeToString(info.Hash[:])},
363 ctx.LogI("sp-infos", LEs{
366 {"Pkts", len(payloads)},
370 return payloadsSplit(payloads)
373 func (state *SPState) StartI(conn ConnDeadlined) error {
374 nodeId := state.Node.Id
375 err := state.Ctx.ensureRxDir(nodeId)
380 if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
381 rxLock, err = state.Ctx.LockDir(nodeId, string(TRx))
387 if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
388 txLock, err = state.Ctx.LockDir(nodeId, string(TTx))
393 started := time.Now()
394 conf := noise.Config{
395 CipherSuite: NoiseCipherSuite,
396 Pattern: noise.HandshakeIK,
398 StaticKeypair: noise.DHKey{
399 Private: state.Ctx.Self.NoisePrv[:],
400 Public: state.Ctx.Self.NoisePub[:],
402 PeerStatic: state.Node.NoisePub[:],
404 hs, err := noise.NewHandshakeState(conf)
409 state.payloads = make(chan []byte)
410 state.pings = make(chan struct{})
411 state.infosTheir = make(map[[32]byte]*SPInfo)
412 state.infosOurSeen = make(map[[32]byte]uint8)
413 state.started = started
414 state.rxLock = rxLock
415 state.txLock = txLock
417 var infosPayloads [][]byte
418 if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
419 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
421 var firstPayload []byte
422 if len(infosPayloads) > 0 {
423 firstPayload = infosPayloads[0]
425 // Pad first payload, to hide actual number of existing files
426 for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
427 firstPayload = append(firstPayload, SPHaltMarshalized...)
432 buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
437 les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}}
438 state.Ctx.LogD("sp-start", les, "sending first message")
439 conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
440 if err = state.WriteSP(conn, buf, false); err != nil {
441 state.Ctx.LogE("sp-start", les, err, "")
445 state.Ctx.LogD("sp-start", les, "waiting for first message")
446 conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
447 if buf, err = state.ReadSP(conn); err != nil {
448 state.Ctx.LogE("sp-start", les, err, "")
452 payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
454 state.Ctx.LogE("sp-start", les, err, "")
458 state.Ctx.LogD("sp-start", les, "starting workers")
459 err = state.StartWorkers(conn, infosPayloads, payload)
461 state.Ctx.LogE("sp-start", les, err, "")
467 func (state *SPState) StartR(conn ConnDeadlined) error {
468 started := time.Now()
469 conf := noise.Config{
470 CipherSuite: NoiseCipherSuite,
471 Pattern: noise.HandshakeIK,
473 StaticKeypair: noise.DHKey{
474 Private: state.Ctx.Self.NoisePrv[:],
475 Public: state.Ctx.Self.NoisePub[:],
478 hs, err := noise.NewHandshakeState(conf)
484 state.payloads = make(chan []byte)
485 state.pings = make(chan struct{})
486 state.infosOurSeen = make(map[[32]byte]uint8)
487 state.infosTheir = make(map[[32]byte]*SPInfo)
488 state.started = started
489 state.xxOnly = xxOnly
493 state.Ctx.LogD("sp-start", LEs{{"Nice", int(state.Nice)}}, "waiting for first message")
494 conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
495 if buf, err = state.ReadSP(conn); err != nil {
496 state.Ctx.LogE("sp-start", LEs{}, err, "")
499 if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
500 state.Ctx.LogE("sp-start", LEs{}, err, "")
505 for _, n := range state.Ctx.Neigh {
506 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
512 peerId := Base32Codec.EncodeToString(state.hs.PeerStatic())
513 state.Ctx.LogE("sp-start", LEs{{"Peer", peerId}}, errors.New("unknown peer"), "")
514 return errors.New("Unknown peer: " + peerId)
517 state.rxRate = node.RxRate
518 state.txRate = node.TxRate
519 state.onlineDeadline = node.OnlineDeadline
520 state.maxOnlineTime = node.MaxOnlineTime
521 les := LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}}
523 if err = state.Ctx.ensureRxDir(node.Id); err != nil {
527 if xxOnly == "" || xxOnly == TRx {
528 rxLock, err = state.Ctx.LockDir(node.Id, string(TRx))
533 state.rxLock = rxLock
535 if xxOnly == "" || xxOnly == TTx {
536 txLock, err = state.Ctx.LockDir(node.Id, string(TTx))
541 state.txLock = txLock
543 var infosPayloads [][]byte
544 if xxOnly == "" || xxOnly == TTx {
545 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
547 var firstPayload []byte
548 if len(infosPayloads) > 0 {
549 firstPayload = infosPayloads[0]
551 // Pad first payload, to hide actual number of existing files
552 for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
553 firstPayload = append(firstPayload, SPHaltMarshalized...)
556 state.Ctx.LogD("sp-start", les, "sending first message")
557 buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
562 conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
563 if err = state.WriteSP(conn, buf, false); err != nil {
564 state.Ctx.LogE("sp-start", les, err, "")
568 state.Ctx.LogD("sp-start", les, "starting workers")
569 err = state.StartWorkers(conn, infosPayloads, payload)
576 func (state *SPState) closeFd(pth string) {
577 s, exists := state.fds[pth]
578 delete(state.fds, pth)
584 func (state *SPState) StartWorkers(
586 infosPayloads [][]byte,
589 les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
590 state.fds = make(map[string]FdAndFullSize)
591 state.fileHashers = make(map[string]*HasherAndOffset)
592 state.isDead = make(chan struct{})
593 if state.maxOnlineTime > 0 {
594 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
599 queues := spCheckers[*state.Node.Id]
601 queues = &SPCheckerQueues{
602 appeared: make(chan *[32]byte),
603 checked: make(chan *[32]byte),
605 spCheckers[*state.Node.Id] = queues
606 go SPChecker(state.Ctx, state.Node.Id, queues.appeared, queues.checked)
608 state.checkerQueues = *queues
610 for job := range state.Ctx.JobsNoCK(state.Node.Id) {
611 if job.PktEnc.Nice <= state.Nice {
612 state.checkerQueues.appeared <- job.HshValue
618 defer state.wg.Done()
623 case hsh := <-state.checkerQueues.checked:
624 state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh})
630 // Remaining handshake payload sending
631 if len(infosPayloads) > 1 {
634 for _, payload := range infosPayloads[1:] {
637 append(les, LE{"Size", len(payload)}),
638 "queuing remaining payload",
640 state.payloads <- payload
646 // Processing of first payload and queueing its responses
649 append(les, LE{"Size", len(payload)}),
650 "processing first payload",
652 replies, err := state.ProcessSP(payload)
654 state.Ctx.LogE("sp-work", les, err, "")
659 for _, reply := range replies {
662 append(les, LE{"Size", len(reply)}),
665 state.payloads <- reply
673 deadlineTicker := time.NewTicker(time.Second)
674 pingTicker := time.NewTicker(PingTimeout)
679 deadlineTicker.Stop()
682 case now := <-deadlineTicker.C:
683 if (now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
684 now.Sub(state.TxLastNonPing) >= state.onlineDeadline) ||
685 (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) ||
686 (now.Sub(state.RxLastSeen) >= 2*PingTimeout) {
688 conn.Close() // #nosec G104
690 case now := <-pingTicker.C:
691 if now.After(state.TxLastSeen.Add(PingTimeout)) {
694 state.pings <- struct{}{}
702 // Spool checker and INFOs sender of appearing files
703 if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
706 ticker := time.NewTicker(time.Second)
714 for _, payload := range state.Ctx.infosOur(
721 append(les, LE{"Size", len(payload)}),
724 state.payloads <- payload
735 defer state.SetDead()
736 defer state.wg.Done()
738 if state.NotAlive() {
745 state.Ctx.LogD("sp-xmit", les, "got ping")
746 payload = SPPingMarshalized
748 case payload = <-state.payloads:
751 append(les, LE{"Size", len(payload)}),
756 if len(state.queueTheir) == 0 {
758 time.Sleep(100 * time.Millisecond)
761 freq := state.queueTheir[0].freq
763 if state.txRate > 0 {
764 time.Sleep(time.Second / time.Duration(state.txRate))
766 lesp := append(les, LEs{
768 {"Pkt", Base32Codec.EncodeToString(freq.Hash[:])},
769 {"Size", int64(freq.Offset)},
771 state.Ctx.LogD("sp-file", lesp, "queueing")
772 pth := filepath.Join(
774 state.Node.Id.String(),
776 Base32Codec.EncodeToString(freq.Hash[:]),
778 fdAndFullSize, exists := state.fds[pth]
780 fd, err := os.Open(pth)
782 state.Ctx.LogE("sp-file", lesp, err, "")
787 state.Ctx.LogE("sp-file", lesp, err, "")
790 fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
791 state.fds[pth] = fdAndFullSize
793 fd := fdAndFullSize.fd
794 fullSize := fdAndFullSize.fullSize
796 if freq.Offset < uint64(fullSize) {
797 state.Ctx.LogD("sp-file", lesp, "seeking")
798 if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
799 state.Ctx.LogE("sp-file", lesp, err, "")
802 buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
803 n, err := fd.Read(buf)
805 state.Ctx.LogE("sp-file", lesp, err, "")
809 state.Ctx.LogD("sp-file", append(lesp, LE{"Size", n}), "read")
812 payload = MarshalSP(SPTypeFile, SPFile{
817 ourSize := freq.Offset + uint64(len(buf))
818 lesp = append(lesp, LE{"Size", int64(ourSize)})
819 lesp = append(lesp, LE{"FullSize", fullSize})
820 if state.Ctx.ShowPrgrs {
824 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
825 if ourSize == uint64(fullSize) {
826 state.Ctx.LogD("sp-file", lesp, "finished")
827 if len(state.queueTheir) > 1 {
828 state.queueTheir = state.queueTheir[1:]
830 state.queueTheir = state.queueTheir[:0]
833 state.queueTheir[0].freq.Offset += uint64(len(buf))
836 state.Ctx.LogD("sp-file", lesp, "queue disappeared")
840 state.Ctx.LogD("sp-xmit", append(les, LE{"Size", len(payload)}), "sending")
841 conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
842 if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
843 state.Ctx.LogE("sp-xmit", les, err, "")
853 if state.NotAlive() {
856 state.Ctx.LogD("sp-recv", les, "waiting for payload")
857 conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
858 payload, err := state.ReadSP(conn)
863 unmarshalErr := err.(*xdr.UnmarshalError)
864 if os.IsTimeout(unmarshalErr.Err) {
867 if unmarshalErr.ErrorCode == xdr.ErrIO {
870 state.Ctx.LogE("sp-recv", les, err, "")
875 append(les, LE{"Size", len(payload)}),
878 payload, err = state.csTheir.Decrypt(nil, nil, payload)
880 state.Ctx.LogE("sp-recv", les, err, "")
885 append(les, LE{"Size", len(payload)}),
888 replies, err := state.ProcessSP(payload)
890 state.Ctx.LogE("sp-recv", les, err, "")
895 for _, reply := range replies {
898 append(les, LE{"Size", len(reply)}),
901 state.payloads <- reply
905 if state.rxRate > 0 {
906 time.Sleep(time.Second / time.Duration(state.rxRate))
912 conn.Close() // #nosec G104
918 func (state *SPState) Wait() {
920 close(state.payloads)
923 state.Duration = time.Now().Sub(state.started)
924 state.RxSpeed = state.RxBytes
925 state.TxSpeed = state.TxBytes
926 rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
927 txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
929 state.RxSpeed = state.RxBytes / rxDuration
932 state.TxSpeed = state.TxBytes / txDuration
936 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
937 les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
938 r := bytes.NewReader(payload)
943 state.Ctx.LogD("sp-process", les, "unmarshaling header")
945 if _, err = xdr.Unmarshal(r, &head); err != nil {
946 state.Ctx.LogE("sp-process", les, err, "")
949 if head.Type != SPTypePing {
950 state.RxLastNonPing = state.RxLastSeen
954 state.Ctx.LogD("sp-process", append(les, LE{"Type", "halt"}), "")
956 state.queueTheir = nil
960 state.Ctx.LogD("sp-process", append(les, LE{"Type", "ping"}), "")
964 lesp := append(les, LE{"Type", "info"})
965 state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
967 if _, err = xdr.Unmarshal(r, &info); err != nil {
968 state.Ctx.LogE("sp-process", lesp, err, "")
971 lesp = append(lesp, LEs{
972 {"Pkt", Base32Codec.EncodeToString(info.Hash[:])},
973 {"Size", int64(info.Size)},
974 {"Nice", int(info.Nice)},
976 if !state.listOnly && info.Nice > state.Nice {
977 state.Ctx.LogD("sp-process", lesp, "too nice")
980 state.Ctx.LogD("sp-process", lesp, "received")
981 if !state.listOnly && state.xxOnly == TTx {
985 state.infosTheir[*info.Hash] = &info
987 state.Ctx.LogD("sp-process", lesp, "stating part")
988 pktPath := filepath.Join(
990 state.Node.Id.String(),
992 Base32Codec.EncodeToString(info.Hash[:]),
994 if _, err = os.Stat(pktPath); err == nil {
995 state.Ctx.LogI("sp-info", lesp, "already done")
997 replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1001 if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
1002 state.Ctx.LogI("sp-info", lesp, "already seen")
1003 if !state.listOnly {
1004 replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1008 if _, err = os.Stat(pktPath + NoCKSuffix); err == nil {
1009 state.Ctx.LogI("sp-info", lesp, "still non checksummed")
1012 fi, err := os.Stat(pktPath + PartSuffix)
1017 if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
1018 state.Ctx.LogI("sp-info", lesp, "not enough space")
1021 state.Ctx.LogI("sp-info", append(lesp, LE{"Offset", offset}), "")
1022 if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
1023 replies = append(replies, MarshalSP(
1025 SPFreq{info.Hash, uint64(offset)},
1030 lesp := append(les, LE{"Type", "file"})
1031 state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
1033 if _, err = xdr.Unmarshal(r, &file); err != nil {
1034 state.Ctx.LogE("sp-process", lesp, err, "")
1037 lesp = append(lesp, LEs{
1038 {"XX", string(TRx)},
1039 {"Pkt", Base32Codec.EncodeToString(file.Hash[:])},
1040 {"Size", len(file.Payload)},
1042 dirToSync := filepath.Join(
1044 state.Node.Id.String(),
1047 filePath := filepath.Join(dirToSync, Base32Codec.EncodeToString(file.Hash[:]))
1048 filePathPart := filePath + PartSuffix
1049 state.Ctx.LogD("sp-file", lesp, "opening part")
1050 fdAndFullSize, exists := state.fds[filePathPart]
1053 fd = fdAndFullSize.fd
1055 fd, err = os.OpenFile(
1057 os.O_RDWR|os.O_CREATE,
1061 state.Ctx.LogE("sp-file", lesp, err, "")
1064 state.fds[filePathPart] = FdAndFullSize{fd: fd}
1065 if file.Offset == 0 {
1066 h, err := blake2b.New256(nil)
1070 state.fileHashers[filePath] = &HasherAndOffset{h: h}
1073 state.Ctx.LogD("sp-file", append(lesp, LE{"Offset", file.Offset}), "seeking")
1074 if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
1075 state.Ctx.LogE("sp-file", lesp, err, "")
1076 state.closeFd(filePathPart)
1079 state.Ctx.LogD("sp-file", lesp, "writing")
1080 if _, err = fd.Write(file.Payload); err != nil {
1081 state.Ctx.LogE("sp-file", lesp, err, "")
1082 state.closeFd(filePathPart)
1085 hasherAndOffset, hasherExists := state.fileHashers[filePath]
1087 if hasherAndOffset.offset == file.Offset {
1088 if _, err = hasherAndOffset.h.Write(file.Payload); err != nil {
1091 hasherAndOffset.offset += uint64(len(file.Payload))
1095 errors.New("offset differs"),
1098 delete(state.fileHashers, filePath)
1099 hasherExists = false
1102 ourSize := int64(file.Offset + uint64(len(file.Payload)))
1103 lesp[len(lesp)-1].V = ourSize
1104 fullsize := int64(0)
1106 infoTheir, ok := state.infosTheir[*file.Hash]
1109 fullsize = int64(infoTheir.Size)
1111 lesp = append(lesp, LE{"FullSize", fullsize})
1112 if state.Ctx.ShowPrgrs {
1113 Progress("Rx", lesp)
1115 if fullsize != ourSize {
1120 state.Ctx.LogE("sp-file", lesp, err, "sync")
1121 state.closeFd(filePathPart)
1125 if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 {
1126 state.Ctx.LogE("sp-file", lesp, errors.New("checksum mismatch"), "")
1129 if err = os.Rename(filePathPart, filePath); err != nil {
1130 state.Ctx.LogE("sp-file", lesp, err, "rename")
1133 if err = DirSync(dirToSync); err != nil {
1134 state.Ctx.LogE("sp-file", lesp, err, "sync")
1137 state.Ctx.LogI("sp-file", lesp, "done")
1140 state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
1144 delete(state.infosTheir, *file.Hash)
1146 if !state.Ctx.HdrUsage {
1147 state.closeFd(filePathPart)
1150 if _, err = fd.Seek(0, io.SeekStart); err != nil {
1151 state.Ctx.LogE("sp-file", lesp, err, "seek")
1152 state.closeFd(filePathPart)
1155 _, pktEncRaw, err := state.Ctx.HdrRead(fd)
1156 state.closeFd(filePathPart)
1158 state.Ctx.LogE("sp-file", lesp, err, "HdrRead")
1161 state.Ctx.HdrWrite(pktEncRaw, filePath)
1164 state.closeFd(filePathPart)
1165 if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
1166 state.Ctx.LogE("sp-file", lesp, err, "rename")
1169 if err = DirSync(dirToSync); err != nil {
1170 state.Ctx.LogE("sp-file", lesp, err, "sync")
1173 state.Ctx.LogI("sp-file", lesp, "downloaded")
1175 delete(state.infosTheir, *file.Hash)
1178 state.checkerQueues.appeared <- file.Hash
1182 lesp := append(les, LE{"Type", "done"})
1183 state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
1185 if _, err = xdr.Unmarshal(r, &done); err != nil {
1186 state.Ctx.LogE("sp-process", lesp, err, "")
1189 lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(done.Hash[:])})
1190 lesp = append(lesp, LE{"XX", string(TTx)})
1191 state.Ctx.LogD("sp-done", lesp, "removing")
1192 pth := filepath.Join(
1194 state.Node.Id.String(),
1196 Base32Codec.EncodeToString(done.Hash[:]),
1198 if err = os.Remove(pth); err == nil {
1199 state.Ctx.LogI("sp-done", lesp, "")
1200 if state.Ctx.HdrUsage {
1201 os.Remove(pth + HdrSuffix)
1204 state.Ctx.LogE("sp-done", lesp, err, "")
1208 lesp := append(les, LE{"Type", "freq"})
1209 state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
1211 if _, err = xdr.Unmarshal(r, &freq); err != nil {
1212 state.Ctx.LogE("sp-process", lesp, err, "")
1215 lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(freq.Hash[:])})
1216 lesp = append(lesp, LE{"Offset", freq.Offset})
1217 state.Ctx.LogD("sp-process", lesp, "queueing")
1218 nice, exists := state.infosOurSeen[*freq.Hash]
1220 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1223 var freqWithNice *FreqWithNice
1224 for insertIdx, freqWithNice = range state.queueTheir {
1225 if freqWithNice.nice > nice {
1229 state.queueTheir = append(state.queueTheir, nil)
1230 copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1231 state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1234 state.Ctx.LogD("sp-process", lesp, "skipping")
1237 state.Ctx.LogD("sp-process", lesp, "unknown")
1243 append(les, LE{"Type", head.Type}),
1244 errors.New("unknown type"),
1247 return nil, BadPktType
1254 for _, info := range state.infosTheir {
1259 state.Ctx.LogI("sp-infos", LEs{
1260 {"XX", string(TRx)},
1261 {"Node", state.Node.Id},
1263 {"Size", int64(size)},
1266 return payloadsSplit(replies), nil