2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
31 xdr "github.com/davecgh/go-xdr/xdr2"
32 "github.com/flynn/noise"
36 MaxSPSize = 1<<16 - 256
42 MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
47 SPHaltMarshalized []byte
48 SPPingMarshalized []byte
50 NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
52 noise.CipherChaChaPoly,
56 DefaultDeadline = 10 * time.Second
57 PingTimeout = time.Minute
60 type FdAndFullSize struct {
68 SPTypeInfo SPType = iota
69 SPTypeFreq SPType = iota
70 SPTypeFile SPType = iota
71 SPTypeDone SPType = iota
72 SPTypeHalt SPType = iota
73 SPTypePing SPType = iota
106 type FreqWithNice struct {
111 type ConnDeadlined interface {
113 SetReadDeadline(t time.Time) error
114 SetWriteDeadline(t time.Time) error
119 spHead := SPHead{Type: SPTypeHalt}
120 if _, err := xdr.Marshal(&buf, spHead); err != nil {
123 SPHaltMarshalized = make([]byte, SPHeadOverhead)
124 copy(SPHaltMarshalized, buf.Bytes())
127 spHead = SPHead{Type: SPTypePing}
128 if _, err := xdr.Marshal(&buf, spHead); err != nil {
131 SPPingMarshalized = make([]byte, SPHeadOverhead)
132 copy(SPPingMarshalized, buf.Bytes())
135 spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
136 if _, err := xdr.Marshal(&buf, spInfo); err != nil {
139 SPInfoOverhead = buf.Len()
142 spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
143 if _, err := xdr.Marshal(&buf, spFreq); err != nil {
146 SPFreqOverhead = buf.Len()
149 spFile := SPFile{Hash: new([32]byte), Offset: 123}
150 if _, err := xdr.Marshal(&buf, spFile); err != nil {
153 SPFileOverhead = buf.Len()
156 func MarshalSP(typ SPType, sp interface{}) []byte {
158 if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
161 if _, err := xdr.Marshal(&buf, sp); err != nil {
167 func payloadsSplit(payloads [][]byte) [][]byte {
168 var outbounds [][]byte
169 outbound := make([]byte, 0, MaxSPSize)
170 for i, payload := range payloads {
171 outbound = append(outbound, payload...)
172 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
173 outbounds = append(outbounds, outbound)
174 outbound = make([]byte, 0, MaxSPSize)
177 if len(outbound) > 0 {
178 outbounds = append(outbounds, outbound)
183 type SPState struct {
188 onlineDeadline time.Duration
189 maxOnlineTime time.Duration
190 hs *noise.HandshakeState
191 csOur *noise.CipherState
192 csTheir *noise.CipherState
195 infosTheir map[[32]byte]*SPInfo
196 infosOurSeen map[[32]byte]uint8
197 queueTheir []*FreqWithNice
201 RxLastNonPing time.Time
204 TxLastNonPing time.Time
206 mustFinishAt time.Time
207 Duration time.Duration
217 onlyPkts map[[32]byte]bool
218 writeSPBuf bytes.Buffer
219 fds map[string]FdAndFullSize
220 checkerJobs chan *[32]byte
224 func (state *SPState) SetDead() {
229 // Already closed channel, dead
235 for range state.payloads {
239 for range state.pings {
243 for _, s := range state.fds {
248 close(state.checkerJobs)
252 func (state *SPState) NotAlive() bool {
261 func (state *SPState) dirUnlock() {
262 state.Ctx.UnlockDir(state.rxLock)
263 state.Ctx.UnlockDir(state.txLock)
266 func (state *SPState) SPChecker() {
267 for hshValue := range state.checkerJobs {
270 {"Node", state.Node.Id},
271 {"Pkt", Base32Codec.EncodeToString(hshValue[:])},
273 state.Ctx.LogD("sp-file", les, "checking")
274 size, err := state.Ctx.CheckNoCK(state.Node.Id, hshValue)
275 les = append(les, LE{"Size", size})
277 state.Ctx.LogE("sp-file", les, err, "")
280 state.Ctx.LogI("sp-done", les, "")
282 go func(hsh *[32]byte) {
283 if !state.NotAlive() {
284 state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh})
291 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
292 state.writeSPBuf.Reset()
293 n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
300 if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
301 state.TxLastSeen = time.Now()
302 state.TxBytes += int64(n)
304 state.TxLastNonPing = state.TxLastSeen
310 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
312 n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
314 ue := err.(*xdr.UnmarshalError)
315 if ue.Err == io.EOF {
320 state.RxLastSeen = time.Now()
321 state.RxBytes += int64(n)
322 if sp.Magic != MagicNNCPLv1 {
325 return sp.Payload, nil
328 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
331 for job := range ctx.Jobs(nodeId, TTx) {
332 if job.PktEnc.Nice > nice {
335 if _, known := (*seen)[*job.HshValue]; known {
338 totalSize += job.Size
339 infos = append(infos, &SPInfo{
340 Nice: job.PktEnc.Nice,
341 Size: uint64(job.Size),
344 (*seen)[*job.HshValue] = job.PktEnc.Nice
346 sort.Sort(ByNice(infos))
347 var payloads [][]byte
348 for _, info := range infos {
349 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
350 ctx.LogD("sp-info-our", LEs{
352 {"Name", Base32Codec.EncodeToString(info.Hash[:])},
357 ctx.LogI("sp-infos", LEs{
360 {"Pkts", len(payloads)},
364 return payloadsSplit(payloads)
367 func (state *SPState) StartI(conn ConnDeadlined) error {
368 nodeId := state.Node.Id
369 err := state.Ctx.ensureRxDir(nodeId)
374 if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
375 rxLock, err = state.Ctx.LockDir(nodeId, string(TRx))
381 if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
382 txLock, err = state.Ctx.LockDir(nodeId, string(TTx))
387 started := time.Now()
388 conf := noise.Config{
389 CipherSuite: NoiseCipherSuite,
390 Pattern: noise.HandshakeIK,
392 StaticKeypair: noise.DHKey{
393 Private: state.Ctx.Self.NoisePrv[:],
394 Public: state.Ctx.Self.NoisePub[:],
396 PeerStatic: state.Node.NoisePub[:],
398 hs, err := noise.NewHandshakeState(conf)
403 state.payloads = make(chan []byte)
404 state.pings = make(chan struct{})
405 state.infosTheir = make(map[[32]byte]*SPInfo)
406 state.infosOurSeen = make(map[[32]byte]uint8)
407 state.started = started
408 state.rxLock = rxLock
409 state.txLock = txLock
411 var infosPayloads [][]byte
412 if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
413 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
415 var firstPayload []byte
416 if len(infosPayloads) > 0 {
417 firstPayload = infosPayloads[0]
419 // Pad first payload, to hide actual number of existing files
420 for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
421 firstPayload = append(firstPayload, SPHaltMarshalized...)
426 buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
431 les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}}
432 state.Ctx.LogD("sp-start", les, "sending first message")
433 conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
434 if err = state.WriteSP(conn, buf, false); err != nil {
435 state.Ctx.LogE("sp-start", les, err, "")
439 state.Ctx.LogD("sp-start", les, "waiting for first message")
440 conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
441 if buf, err = state.ReadSP(conn); err != nil {
442 state.Ctx.LogE("sp-start", les, err, "")
446 payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
448 state.Ctx.LogE("sp-start", les, err, "")
452 state.Ctx.LogD("sp-start", les, "starting workers")
453 err = state.StartWorkers(conn, infosPayloads, payload)
455 state.Ctx.LogE("sp-start", les, err, "")
461 func (state *SPState) StartR(conn ConnDeadlined) error {
462 started := time.Now()
463 conf := noise.Config{
464 CipherSuite: NoiseCipherSuite,
465 Pattern: noise.HandshakeIK,
467 StaticKeypair: noise.DHKey{
468 Private: state.Ctx.Self.NoisePrv[:],
469 Public: state.Ctx.Self.NoisePub[:],
472 hs, err := noise.NewHandshakeState(conf)
478 state.payloads = make(chan []byte)
479 state.pings = make(chan struct{})
480 state.infosOurSeen = make(map[[32]byte]uint8)
481 state.infosTheir = make(map[[32]byte]*SPInfo)
482 state.started = started
483 state.xxOnly = xxOnly
487 state.Ctx.LogD("sp-start", LEs{{"Nice", int(state.Nice)}}, "waiting for first message")
488 conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
489 if buf, err = state.ReadSP(conn); err != nil {
490 state.Ctx.LogE("sp-start", LEs{}, err, "")
493 if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
494 state.Ctx.LogE("sp-start", LEs{}, err, "")
499 for _, n := range state.Ctx.Neigh {
500 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
506 peerId := Base32Codec.EncodeToString(state.hs.PeerStatic())
507 state.Ctx.LogE("sp-start", LEs{{"Peer", peerId}}, errors.New("unknown peer"), "")
508 return errors.New("Unknown peer: " + peerId)
511 state.rxRate = node.RxRate
512 state.txRate = node.TxRate
513 state.onlineDeadline = node.OnlineDeadline
514 state.maxOnlineTime = node.MaxOnlineTime
515 les := LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}}
517 if err = state.Ctx.ensureRxDir(node.Id); err != nil {
521 if xxOnly == "" || xxOnly == TRx {
522 rxLock, err = state.Ctx.LockDir(node.Id, string(TRx))
527 state.rxLock = rxLock
529 if xxOnly == "" || xxOnly == TTx {
530 txLock, err = state.Ctx.LockDir(node.Id, string(TTx))
535 state.txLock = txLock
537 var infosPayloads [][]byte
538 if xxOnly == "" || xxOnly == TTx {
539 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
541 var firstPayload []byte
542 if len(infosPayloads) > 0 {
543 firstPayload = infosPayloads[0]
545 // Pad first payload, to hide actual number of existing files
546 for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
547 firstPayload = append(firstPayload, SPHaltMarshalized...)
550 state.Ctx.LogD("sp-start", les, "sending first message")
551 buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
556 conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
557 if err = state.WriteSP(conn, buf, false); err != nil {
558 state.Ctx.LogE("sp-start", les, err, "")
562 state.Ctx.LogD("sp-start", les, "starting workers")
563 err = state.StartWorkers(conn, infosPayloads, payload)
570 func (state *SPState) closeFd(pth string) {
571 s, exists := state.fds[pth]
572 delete(state.fds, pth)
578 func (state *SPState) FillExistingNoCK() {
579 checkerJobs := make([]*[32]byte, 0)
580 for job := range state.Ctx.JobsNoCK(state.Node.Id) {
581 if job.PktEnc.Nice > state.Nice {
584 checkerJobs = append(checkerJobs, job.HshValue)
586 for _, job := range checkerJobs {
587 state.checkerJobs <- job
592 func (state *SPState) StartWorkers(
594 infosPayloads [][]byte,
597 les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
598 state.fds = make(map[string]FdAndFullSize)
599 state.isDead = make(chan struct{})
600 if state.maxOnlineTime > 0 {
601 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
606 state.checkerJobs = make(chan *[32]byte)
609 go state.FillExistingNoCK()
612 // Remaining handshake payload sending
613 if len(infosPayloads) > 1 {
616 for _, payload := range infosPayloads[1:] {
619 append(les, LE{"Size", len(payload)}),
620 "queuing remaining payload",
622 state.payloads <- payload
628 // Processing of first payload and queueing its responses
631 append(les, LE{"Size", len(payload)}),
632 "processing first payload",
634 replies, err := state.ProcessSP(payload)
636 state.Ctx.LogE("sp-work", les, err, "")
641 for _, reply := range replies {
644 append(les, LE{"Size", len(reply)}),
647 state.payloads <- reply
655 deadlineTicker := time.NewTicker(time.Second)
656 pingTicker := time.NewTicker(PingTimeout)
661 deadlineTicker.Stop()
664 case now := <-deadlineTicker.C:
665 if (now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
666 now.Sub(state.TxLastNonPing) >= state.onlineDeadline) ||
667 (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) ||
668 (now.Sub(state.RxLastSeen) >= 2*PingTimeout) {
670 conn.Close() // #nosec G104
672 case now := <-pingTicker.C:
673 if now.After(state.TxLastSeen.Add(PingTimeout)) {
676 state.pings <- struct{}{}
684 // Spool checker and INFOs sender of appearing files
685 if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
688 ticker := time.NewTicker(time.Second)
696 for _, payload := range state.Ctx.infosOur(
703 append(les, LE{"Size", len(payload)}),
706 state.payloads <- payload
717 defer state.SetDead()
718 defer state.wg.Done()
720 if state.NotAlive() {
727 state.Ctx.LogD("sp-xmit", les, "got ping")
728 payload = SPPingMarshalized
730 case payload = <-state.payloads:
733 append(les, LE{"Size", len(payload)}),
738 if len(state.queueTheir) == 0 {
740 time.Sleep(100 * time.Millisecond)
743 freq := state.queueTheir[0].freq
745 if state.txRate > 0 {
746 time.Sleep(time.Second / time.Duration(state.txRate))
748 lesp := append(les, LEs{
750 {"Pkt", Base32Codec.EncodeToString(freq.Hash[:])},
751 {"Size", int64(freq.Offset)},
753 state.Ctx.LogD("sp-file", lesp, "queueing")
754 pth := filepath.Join(
756 state.Node.Id.String(),
758 Base32Codec.EncodeToString(freq.Hash[:]),
760 fdAndFullSize, exists := state.fds[pth]
762 fd, err := os.Open(pth)
764 state.Ctx.LogE("sp-file", lesp, err, "")
769 state.Ctx.LogE("sp-file", lesp, err, "")
772 fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
773 state.fds[pth] = fdAndFullSize
775 fd := fdAndFullSize.fd
776 fullSize := fdAndFullSize.fullSize
778 if freq.Offset < uint64(fullSize) {
779 state.Ctx.LogD("sp-file", lesp, "seeking")
780 if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
781 state.Ctx.LogE("sp-file", lesp, err, "")
784 buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
785 n, err := fd.Read(buf)
787 state.Ctx.LogE("sp-file", lesp, err, "")
791 state.Ctx.LogD("sp-file", append(lesp, LE{"Size", n}), "read")
794 payload = MarshalSP(SPTypeFile, SPFile{
799 ourSize := freq.Offset + uint64(len(buf))
800 lesp = append(lesp, LE{"Size", int64(ourSize)})
801 lesp = append(lesp, LE{"FullSize", fullSize})
802 if state.Ctx.ShowPrgrs {
806 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
807 if ourSize == uint64(fullSize) {
808 state.Ctx.LogD("sp-file", lesp, "finished")
809 if len(state.queueTheir) > 1 {
810 state.queueTheir = state.queueTheir[1:]
812 state.queueTheir = state.queueTheir[:0]
815 state.queueTheir[0].freq.Offset += uint64(len(buf))
818 state.Ctx.LogD("sp-file", lesp, "queue disappeared")
822 state.Ctx.LogD("sp-xmit", append(les, LE{"Size", len(payload)}), "sending")
823 conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
824 if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
825 state.Ctx.LogE("sp-xmit", les, err, "")
835 if state.NotAlive() {
838 state.Ctx.LogD("sp-recv", les, "waiting for payload")
839 conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
840 payload, err := state.ReadSP(conn)
845 unmarshalErr := err.(*xdr.UnmarshalError)
846 if os.IsTimeout(unmarshalErr.Err) {
849 if unmarshalErr.ErrorCode == xdr.ErrIO {
852 state.Ctx.LogE("sp-recv", les, err, "")
857 append(les, LE{"Size", len(payload)}),
860 payload, err = state.csTheir.Decrypt(nil, nil, payload)
862 state.Ctx.LogE("sp-recv", les, err, "")
867 append(les, LE{"Size", len(payload)}),
870 replies, err := state.ProcessSP(payload)
872 state.Ctx.LogE("sp-recv", les, err, "")
877 for _, reply := range replies {
880 append(les, LE{"Size", len(reply)}),
883 state.payloads <- reply
887 if state.rxRate > 0 {
888 time.Sleep(time.Second / time.Duration(state.rxRate))
894 conn.Close() // #nosec G104
900 func (state *SPState) Wait() {
902 close(state.payloads)
905 state.Duration = time.Now().Sub(state.started)
906 state.RxSpeed = state.RxBytes
907 state.TxSpeed = state.TxBytes
908 rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
909 txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
911 state.RxSpeed = state.RxBytes / rxDuration
914 state.TxSpeed = state.TxBytes / txDuration
918 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
919 les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
920 r := bytes.NewReader(payload)
925 state.Ctx.LogD("sp-process", les, "unmarshaling header")
927 if _, err = xdr.Unmarshal(r, &head); err != nil {
928 state.Ctx.LogE("sp-process", les, err, "")
931 if head.Type != SPTypePing {
932 state.RxLastNonPing = state.RxLastSeen
936 state.Ctx.LogD("sp-process", append(les, LE{"Type", "halt"}), "")
938 state.queueTheir = nil
941 state.Ctx.LogD("sp-process", append(les, LE{"Type", "ping"}), "")
944 lesp := append(les, LE{"Type", "info"})
945 state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
947 if _, err = xdr.Unmarshal(r, &info); err != nil {
948 state.Ctx.LogE("sp-process", lesp, err, "")
951 lesp = append(lesp, LEs{
952 {"Pkt", Base32Codec.EncodeToString(info.Hash[:])},
953 {"Size", int64(info.Size)},
954 {"Nice", int(info.Nice)},
956 if !state.listOnly && info.Nice > state.Nice {
957 state.Ctx.LogD("sp-process", lesp, "too nice")
960 state.Ctx.LogD("sp-process", lesp, "received")
961 if !state.listOnly && state.xxOnly == TTx {
965 state.infosTheir[*info.Hash] = &info
967 state.Ctx.LogD("sp-process", lesp, "stating part")
968 pktPath := filepath.Join(
970 state.Node.Id.String(),
972 Base32Codec.EncodeToString(info.Hash[:]),
974 if _, err = os.Stat(pktPath); err == nil {
975 state.Ctx.LogI("sp-info", lesp, "already done")
977 replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
981 if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
982 state.Ctx.LogI("sp-info", lesp, "already seen")
984 replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
988 if _, err = os.Stat(pktPath + NoCKSuffix); err == nil {
989 state.Ctx.LogI("sp-info", lesp, "still non checksummed")
992 fi, err := os.Stat(pktPath + PartSuffix)
997 if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
998 state.Ctx.LogI("sp-info", lesp, "not enough space")
1001 state.Ctx.LogI("sp-info", append(lesp, LE{"Offset", offset}), "")
1002 if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
1003 replies = append(replies, MarshalSP(
1005 SPFreq{info.Hash, uint64(offset)},
1009 lesp := append(les, LE{"Type", "file"})
1010 state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
1012 if _, err = xdr.Unmarshal(r, &file); err != nil {
1013 state.Ctx.LogE("sp-process", lesp, err, "")
1016 lesp = append(lesp, LEs{
1017 {"XX", string(TRx)},
1018 {"Pkt", Base32Codec.EncodeToString(file.Hash[:])},
1019 {"Size", len(file.Payload)},
1021 dirToSync := filepath.Join(
1023 state.Node.Id.String(),
1026 filePath := filepath.Join(dirToSync, Base32Codec.EncodeToString(file.Hash[:]))
1027 filePathPart := filePath + PartSuffix
1028 state.Ctx.LogD("sp-file", lesp, "opening part")
1029 fdAndFullSize, exists := state.fds[filePathPart]
1032 fd = fdAndFullSize.fd
1034 fd, err = os.OpenFile(
1036 os.O_RDWR|os.O_CREATE,
1040 state.Ctx.LogE("sp-file", lesp, err, "")
1043 state.fds[filePathPart] = FdAndFullSize{fd: fd}
1045 state.Ctx.LogD("sp-file", append(lesp, LE{"Offset", file.Offset}), "seeking")
1046 if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
1047 state.Ctx.LogE("sp-file", lesp, err, "")
1048 state.closeFd(filePathPart)
1051 state.Ctx.LogD("sp-file", lesp, "writing")
1052 if _, err = fd.Write(file.Payload); err != nil {
1053 state.Ctx.LogE("sp-file", lesp, err, "")
1054 state.closeFd(filePathPart)
1057 ourSize := int64(file.Offset + uint64(len(file.Payload)))
1058 lesp[len(lesp)-1].V = ourSize
1059 fullsize := int64(0)
1061 infoTheir, ok := state.infosTheir[*file.Hash]
1064 fullsize = int64(infoTheir.Size)
1066 lesp = append(lesp, LE{"FullSize", fullsize})
1067 if state.Ctx.ShowPrgrs {
1068 Progress("Rx", lesp)
1070 if fullsize != ourSize {
1074 state.closeFd(filePathPart)
1076 state.Ctx.LogE("sp-file", lesp, err, "sync")
1079 if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
1080 state.Ctx.LogE("sp-file", lesp, err, "rename")
1083 if err = DirSync(dirToSync); err != nil {
1084 state.Ctx.LogE("sp-file", lesp, err, "sync")
1087 state.Ctx.LogI("sp-file", lesp, "downloaded")
1089 delete(state.infosTheir, *file.Hash)
1092 state.checkerJobs <- file.Hash
1095 lesp := append(les, LE{"Type", "done"})
1096 state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
1098 if _, err = xdr.Unmarshal(r, &done); err != nil {
1099 state.Ctx.LogE("sp-process", lesp, err, "")
1102 lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(done.Hash[:])})
1103 state.Ctx.LogD("sp-done", lesp, "removing")
1104 err := os.Remove(filepath.Join(
1106 state.Node.Id.String(),
1108 Base32Codec.EncodeToString(done.Hash[:]),
1110 lesp = append(lesp, LE{"XX", string(TTx)})
1112 state.Ctx.LogI("sp-done", lesp, "")
1114 state.Ctx.LogE("sp-done", lesp, err, "")
1117 lesp := append(les, LE{"Type", "freq"})
1118 state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
1120 if _, err = xdr.Unmarshal(r, &freq); err != nil {
1121 state.Ctx.LogE("sp-process", lesp, err, "")
1124 lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(freq.Hash[:])})
1125 lesp = append(lesp, LE{"Offset", freq.Offset})
1126 state.Ctx.LogD("sp-process", lesp, "queueing")
1127 nice, exists := state.infosOurSeen[*freq.Hash]
1129 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1132 var freqWithNice *FreqWithNice
1133 for insertIdx, freqWithNice = range state.queueTheir {
1134 if freqWithNice.nice > nice {
1138 state.queueTheir = append(state.queueTheir, nil)
1139 copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1140 state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1143 state.Ctx.LogD("sp-process", lesp, "skipping")
1146 state.Ctx.LogD("sp-process", lesp, "unknown")
1151 append(les, LE{"Type", head.Type}),
1152 errors.New("unknown type"),
1155 return nil, BadPktType
1162 for _, info := range state.infosTheir {
1167 state.Ctx.LogI("sp-infos", LEs{
1168 {"XX", string(TRx)},
1169 {"Node", state.Node.Id},
1171 {"Size", int64(size)},
1174 return payloadsSplit(replies), nil