]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
Use typed time.Duration instead of raw uint for maxOnlineTime
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2020 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "io"
25         "net"
26         "os"
27         "path/filepath"
28         "sort"
29         "sync"
30         "time"
31
32         xdr "github.com/davecgh/go-xdr/xdr2"
33         "github.com/flynn/noise"
34 )
35
36 const (
37         MaxSPSize       = 1<<16 - 256
38         PartSuffix      = ".part"
39         DefaultDeadline = 10
40
41         SPHeadOverhead = 4
42 )
43
44 var (
45         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
46
47         SPInfoOverhead    int
48         SPFreqOverhead    int
49         SPFileOverhead    int
50         SPHaltMarshalized []byte
51
52         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
53                 noise.DH25519,
54                 noise.CipherChaChaPoly,
55                 noise.HashBLAKE2b,
56         )
57
58         spWorkersGroup sync.WaitGroup
59 )
60
61 type SPType uint8
62
63 const (
64         SPTypeInfo SPType = iota
65         SPTypeFreq SPType = iota
66         SPTypeFile SPType = iota
67         SPTypeDone SPType = iota
68         SPTypeHalt SPType = iota
69 )
70
71 type SPHead struct {
72         Type SPType
73 }
74
75 type SPInfo struct {
76         Nice uint8
77         Size uint64
78         Hash *[32]byte
79 }
80
81 type SPFreq struct {
82         Hash   *[32]byte
83         Offset uint64
84 }
85
86 type SPFile struct {
87         Hash    *[32]byte
88         Offset  uint64
89         Payload []byte
90 }
91
92 type SPDone struct {
93         Hash *[32]byte
94 }
95
96 type SPRaw struct {
97         Magic   [8]byte
98         Payload []byte
99 }
100
101 type FreqWithNice struct {
102         freq *SPFreq
103         nice uint8
104 }
105
106 type ConnDeadlined interface {
107         io.ReadWriteCloser
108         SetReadDeadline(t time.Time) error
109         SetWriteDeadline(t time.Time) error
110 }
111
112 func init() {
113         var buf bytes.Buffer
114         spHead := SPHead{Type: SPTypeHalt}
115         if _, err := xdr.Marshal(&buf, spHead); err != nil {
116                 panic(err)
117         }
118         SPHaltMarshalized = make([]byte, SPHeadOverhead)
119         copy(SPHaltMarshalized, buf.Bytes())
120         buf.Reset()
121
122         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
123         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
124                 panic(err)
125         }
126         SPInfoOverhead = buf.Len()
127         buf.Reset()
128
129         spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
130         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
131                 panic(err)
132         }
133         SPFreqOverhead = buf.Len()
134         buf.Reset()
135
136         spFile := SPFile{Hash: new([32]byte), Offset: 123}
137         if _, err := xdr.Marshal(&buf, spFile); err != nil {
138                 panic(err)
139         }
140         SPFileOverhead = buf.Len()
141 }
142
143 func MarshalSP(typ SPType, sp interface{}) []byte {
144         var buf bytes.Buffer
145         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
146                 panic(err)
147         }
148         if _, err := xdr.Marshal(&buf, sp); err != nil {
149                 panic(err)
150         }
151         return buf.Bytes()
152 }
153
154 func payloadsSplit(payloads [][]byte) [][]byte {
155         var outbounds [][]byte
156         outbound := make([]byte, 0, MaxSPSize)
157         for i, payload := range payloads {
158                 outbound = append(outbound, payload...)
159                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
160                         outbounds = append(outbounds, outbound)
161                         outbound = make([]byte, 0, MaxSPSize)
162                 }
163         }
164         if len(outbound) > 0 {
165                 outbounds = append(outbounds, outbound)
166         }
167         return outbounds
168 }
169
170 type SPState struct {
171         Ctx            *Ctx
172         Node           *Node
173         Nice           uint8
174         onlineDeadline uint
175         maxOnlineTime  time.Duration
176         hs             *noise.HandshakeState
177         csOur          *noise.CipherState
178         csTheir        *noise.CipherState
179         payloads       chan []byte
180         infosTheir     map[[32]byte]*SPInfo
181         infosOurSeen   map[[32]byte]uint8
182         queueTheir     []*FreqWithNice
183         wg             sync.WaitGroup
184         RxBytes        int64
185         RxLastSeen     time.Time
186         TxBytes        int64
187         TxLastSeen     time.Time
188         started        time.Time
189         Duration       time.Duration
190         RxSpeed        int64
191         TxSpeed        int64
192         rxLock         *os.File
193         txLock         *os.File
194         xxOnly         TRxTx
195         rxRate         int
196         txRate         int
197         isDead         chan struct{}
198         listOnly       bool
199         onlyPkts       map[[32]byte]bool
200         writeSPBuf     bytes.Buffer
201         sync.RWMutex
202 }
203
204 func (state *SPState) SetDead() {
205         state.Lock()
206         defer state.Unlock()
207         select {
208         case _, ok := <-state.isDead:
209                 if !ok {
210                         // Already closed channel, dead
211                         return
212                 }
213         default:
214         }
215         close(state.isDead)
216         go func() {
217                 for _ = range state.payloads {
218                 }
219         }()
220 }
221
222 func (state *SPState) NotAlive() bool {
223         select {
224         case _, ok := <-state.isDead:
225                 if !ok {
226                         return true
227                 }
228         default:
229         }
230         now := time.Now()
231         if state.maxOnlineTime > 0 && state.started.Add(state.maxOnlineTime).Before(now) {
232                 return true
233         }
234         return uint(now.Sub(state.RxLastSeen).Seconds()) >= state.onlineDeadline &&
235                 uint(now.Sub(state.TxLastSeen).Seconds()) >= state.onlineDeadline
236 }
237
238 func (state *SPState) dirUnlock() {
239         state.Ctx.UnlockDir(state.rxLock)
240         state.Ctx.UnlockDir(state.txLock)
241 }
242
243 func (state *SPState) WriteSP(dst io.Writer, payload []byte) error {
244         state.writeSPBuf.Reset()
245         n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
246                 Magic:   MagicNNCPLv1,
247                 Payload: payload,
248         })
249         if err != nil {
250                 return err
251         }
252         if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
253                 state.TxLastSeen = time.Now()
254                 state.TxBytes += int64(n)
255         }
256         return err
257 }
258
259 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
260         var sp SPRaw
261         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
262         if err != nil {
263                 ue := err.(*xdr.UnmarshalError)
264                 if ue.Err == io.EOF {
265                         return nil, ue.Err
266                 }
267                 return nil, err
268         }
269         state.RxLastSeen = time.Now()
270         state.RxBytes += int64(n)
271         if sp.Magic != MagicNNCPLv1 {
272                 return nil, BadMagic
273         }
274         return sp.Payload, nil
275 }
276
277 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
278         var infos []*SPInfo
279         var totalSize int64
280         for job := range ctx.Jobs(nodeId, TTx) {
281                 job.Fd.Close()
282                 if job.PktEnc.Nice > nice {
283                         continue
284                 }
285                 if _, known := (*seen)[*job.HshValue]; known {
286                         continue
287                 }
288                 totalSize += job.Size
289                 infos = append(infos, &SPInfo{
290                         Nice: job.PktEnc.Nice,
291                         Size: uint64(job.Size),
292                         Hash: job.HshValue,
293                 })
294                 (*seen)[*job.HshValue] = job.PktEnc.Nice
295         }
296         sort.Sort(ByNice(infos))
297         var payloads [][]byte
298         for _, info := range infos {
299                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
300                 ctx.LogD("sp-info-our", SDS{
301                         "node": nodeId,
302                         "name": ToBase32(info.Hash[:]),
303                         "size": info.Size,
304                 }, "")
305         }
306         if totalSize > 0 {
307                 ctx.LogI("sp-infos", SDS{
308                         "xx":   string(TTx),
309                         "node": nodeId,
310                         "pkts": len(payloads),
311                         "size": totalSize,
312                 }, "")
313         }
314         return payloadsSplit(payloads)
315 }
316
317 func (state *SPState) StartI(conn ConnDeadlined) error {
318         nodeId := state.Node.Id
319         err := state.Ctx.ensureRxDir(nodeId)
320         if err != nil {
321                 return err
322         }
323         var rxLock *os.File
324         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
325                 rxLock, err = state.Ctx.LockDir(nodeId, TRx)
326                 if err != nil {
327                         return err
328                 }
329         }
330         var txLock *os.File
331         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
332                 txLock, err = state.Ctx.LockDir(nodeId, TTx)
333                 if err != nil {
334                         return err
335                 }
336         }
337         started := time.Now()
338         conf := noise.Config{
339                 CipherSuite: NoiseCipherSuite,
340                 Pattern:     noise.HandshakeIK,
341                 Initiator:   true,
342                 StaticKeypair: noise.DHKey{
343                         Private: state.Ctx.Self.NoisePrv[:],
344                         Public:  state.Ctx.Self.NoisePub[:],
345                 },
346                 PeerStatic: state.Node.NoisePub[:],
347         }
348         hs, err := noise.NewHandshakeState(conf)
349         if err != nil {
350                 return err
351         }
352         state.hs = hs
353         state.payloads = make(chan []byte)
354         state.infosTheir = make(map[[32]byte]*SPInfo)
355         state.infosOurSeen = make(map[[32]byte]uint8)
356         state.started = started
357         state.rxLock = rxLock
358         state.txLock = txLock
359
360         var infosPayloads [][]byte
361         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
362                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
363         }
364         var firstPayload []byte
365         if len(infosPayloads) > 0 {
366                 firstPayload = infosPayloads[0]
367         }
368         // Pad first payload, to hide actual number of existing files
369         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
370                 firstPayload = append(firstPayload, SPHaltMarshalized...)
371         }
372
373         var buf []byte
374         var payload []byte
375         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
376         if err != nil {
377                 state.dirUnlock()
378                 return err
379         }
380         sds := SDS{"node": nodeId, "nice": int(state.Nice)}
381         state.Ctx.LogD("sp-start", sds, "sending first message")
382         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
383         if err = state.WriteSP(conn, buf); err != nil {
384                 state.Ctx.LogE("sp-start", sds, err, "")
385                 state.dirUnlock()
386                 return err
387         }
388         state.Ctx.LogD("sp-start", sds, "waiting for first message")
389         conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
390         if buf, err = state.ReadSP(conn); err != nil {
391                 state.Ctx.LogE("sp-start", sds, err, "")
392                 state.dirUnlock()
393                 return err
394         }
395         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
396         if err != nil {
397                 state.Ctx.LogE("sp-start", sds, err, "")
398                 state.dirUnlock()
399                 return err
400         }
401         state.Ctx.LogD("sp-start", sds, "starting workers")
402         err = state.StartWorkers(conn, infosPayloads, payload)
403         if err != nil {
404                 state.Ctx.LogE("sp-start", sds, err, "")
405                 state.dirUnlock()
406         }
407         return err
408 }
409
410 func (state *SPState) StartR(conn ConnDeadlined) error {
411         started := time.Now()
412         conf := noise.Config{
413                 CipherSuite: NoiseCipherSuite,
414                 Pattern:     noise.HandshakeIK,
415                 Initiator:   false,
416                 StaticKeypair: noise.DHKey{
417                         Private: state.Ctx.Self.NoisePrv[:],
418                         Public:  state.Ctx.Self.NoisePub[:],
419                 },
420         }
421         hs, err := noise.NewHandshakeState(conf)
422         if err != nil {
423                 return err
424         }
425         xxOnly := TRxTx("")
426         state.hs = hs
427         state.payloads = make(chan []byte)
428         state.infosOurSeen = make(map[[32]byte]uint8)
429         state.infosTheir = make(map[[32]byte]*SPInfo)
430         state.started = started
431         state.xxOnly = xxOnly
432         var buf []byte
433         var payload []byte
434         state.Ctx.LogD("sp-start", SDS{"nice": int(state.Nice)}, "waiting for first message")
435         conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
436         if buf, err = state.ReadSP(conn); err != nil {
437                 state.Ctx.LogE("sp-start", SDS{}, err, "")
438                 return err
439         }
440         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
441                 state.Ctx.LogE("sp-start", SDS{}, err, "")
442                 return err
443         }
444
445         var node *Node
446         for _, n := range state.Ctx.Neigh {
447                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
448                         node = n
449                         break
450                 }
451         }
452         if node == nil {
453                 peerId := ToBase32(state.hs.PeerStatic())
454                 state.Ctx.LogE("sp-start", SDS{"peer": peerId}, errors.New("unknown"), "")
455                 return errors.New("Unknown peer: " + peerId)
456         }
457         state.Node = node
458         state.rxRate = node.RxRate
459         state.txRate = node.TxRate
460         state.onlineDeadline = node.OnlineDeadline
461         state.maxOnlineTime = node.MaxOnlineTime
462         sds := SDS{"node": node.Id, "nice": int(state.Nice)}
463
464         if state.Ctx.ensureRxDir(node.Id); err != nil {
465                 return err
466         }
467         var rxLock *os.File
468         if xxOnly == "" || xxOnly == TRx {
469                 rxLock, err = state.Ctx.LockDir(node.Id, TRx)
470                 if err != nil {
471                         return err
472                 }
473         }
474         state.rxLock = rxLock
475         var txLock *os.File
476         if xxOnly == "" || xxOnly == TTx {
477                 txLock, err = state.Ctx.LockDir(node.Id, TTx)
478                 if err != nil {
479                         return err
480                 }
481         }
482         state.txLock = txLock
483
484         var infosPayloads [][]byte
485         if xxOnly == "" || xxOnly == TTx {
486                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
487         }
488         var firstPayload []byte
489         if len(infosPayloads) > 0 {
490                 firstPayload = infosPayloads[0]
491         }
492         // Pad first payload, to hide actual number of existing files
493         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
494                 firstPayload = append(firstPayload, SPHaltMarshalized...)
495         }
496
497         state.Ctx.LogD("sp-start", sds, "sending first message")
498         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
499         if err != nil {
500                 state.dirUnlock()
501                 return err
502         }
503         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
504         if err = state.WriteSP(conn, buf); err != nil {
505                 state.Ctx.LogE("sp-start", sds, err, "")
506                 state.dirUnlock()
507                 return err
508         }
509         state.Ctx.LogD("sp-start", sds, "starting workers")
510         err = state.StartWorkers(conn, infosPayloads, payload)
511         if err != nil {
512                 state.dirUnlock()
513         }
514         return err
515 }
516
517 func (state *SPState) StartWorkers(
518         conn ConnDeadlined,
519         infosPayloads [][]byte,
520         payload []byte,
521 ) error {
522         state.isDead = make(chan struct{})
523         sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)}
524
525         if len(infosPayloads) > 1 {
526                 state.wg.Add(1)
527                 go func() {
528                         for _, payload := range infosPayloads[1:] {
529                                 state.Ctx.LogD(
530                                         "sp-work",
531                                         SdsAdd(sds, SDS{"size": len(payload)}),
532                                         "queuing remaining payload",
533                                 )
534                                 state.payloads <- payload
535                         }
536                         state.wg.Done()
537                 }()
538         }
539         state.Ctx.LogD(
540                 "sp-work",
541                 SdsAdd(sds, SDS{"size": len(payload)}),
542                 "processing first payload",
543         )
544         replies, err := state.ProcessSP(payload)
545         if err != nil {
546                 state.Ctx.LogE("sp-work", sds, err, "")
547                 return err
548         }
549
550         state.wg.Add(1)
551         go func() {
552                 for _, reply := range replies {
553                         state.Ctx.LogD(
554                                 "sp-work",
555                                 SdsAdd(sds, SDS{"size": len(reply)}),
556                                 "queuing reply",
557                         )
558                         state.payloads <- reply
559                 }
560                 state.wg.Done()
561         }()
562
563         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
564                 state.wg.Add(1)
565                 go func() {
566                         ticker := time.NewTicker(time.Second)
567                         for {
568                                 select {
569                                 case _, ok := <-state.isDead:
570                                         if !ok {
571                                                 state.wg.Done()
572                                                 ticker.Stop()
573                                                 return
574                                         }
575                                 case <-ticker.C:
576                                         for _, payload := range state.Ctx.infosOur(
577                                                 state.Node.Id,
578                                                 state.Nice,
579                                                 &state.infosOurSeen,
580                                         ) {
581                                                 state.Ctx.LogD(
582                                                         "sp-work",
583                                                         SdsAdd(sds, SDS{"size": len(payload)}),
584                                                         "queuing new info",
585                                                 )
586                                                 state.payloads <- payload
587                                         }
588                                 }
589                         }
590                 }()
591         }
592
593         state.wg.Add(1)
594         go func() {
595                 for {
596                         if state.NotAlive() {
597                                 break
598                         }
599                         var payload []byte
600                         select {
601                         case payload = <-state.payloads:
602                                 state.Ctx.LogD(
603                                         "sp-xmit",
604                                         SdsAdd(sds, SDS{"size": len(payload)}),
605                                         "got payload",
606                                 )
607                         default:
608                         }
609                         if payload == nil {
610                                 state.RLock()
611                                 if len(state.queueTheir) == 0 {
612                                         state.RUnlock()
613                                         time.Sleep(100 * time.Millisecond)
614                                         continue
615                                 }
616                                 freq := state.queueTheir[0].freq
617                                 state.RUnlock()
618
619                                 if state.txRate > 0 {
620                                         time.Sleep(time.Second / time.Duration(state.txRate))
621                                 }
622
623                                 sdsp := SdsAdd(sds, SDS{
624                                         "xx":   string(TTx),
625                                         "pkt":  ToBase32(freq.Hash[:]),
626                                         "size": int64(freq.Offset),
627                                 })
628                                 state.Ctx.LogD("sp-file", sdsp, "queueing")
629                                 fd, err := os.Open(filepath.Join(
630                                         state.Ctx.Spool,
631                                         state.Node.Id.String(),
632                                         string(TTx),
633                                         ToBase32(freq.Hash[:]),
634                                 ))
635                                 if err != nil {
636                                         state.Ctx.LogE("sp-file", sdsp, err, "")
637                                         break
638                                 }
639                                 fi, err := fd.Stat()
640                                 if err != nil {
641                                         state.Ctx.LogE("sp-file", sdsp, err, "")
642                                         break
643                                 }
644                                 fullSize := fi.Size()
645                                 var buf []byte
646                                 if freq.Offset < uint64(fullSize) {
647                                         state.Ctx.LogD("sp-file", sdsp, "seeking")
648                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
649                                                 state.Ctx.LogE("sp-file", sdsp, err, "")
650                                                 break
651                                         }
652                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
653                                         n, err := fd.Read(buf)
654                                         if err != nil {
655                                                 state.Ctx.LogE("sp-file", sdsp, err, "")
656                                                 break
657                                         }
658                                         buf = buf[:n]
659                                         state.Ctx.LogD("sp-file", SdsAdd(sdsp, SDS{"size": n}), "read")
660                                 }
661                                 fd.Close()
662                                 payload = MarshalSP(SPTypeFile, SPFile{
663                                         Hash:    freq.Hash,
664                                         Offset:  freq.Offset,
665                                         Payload: buf,
666                                 })
667                                 ourSize := freq.Offset + uint64(len(buf))
668                                 sdsp["size"] = int64(ourSize)
669                                 sdsp["fullsize"] = fullSize
670                                 if state.Ctx.ShowPrgrs {
671                                         Progress("Tx", sdsp)
672                                 }
673                                 state.Lock()
674                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
675                                         if ourSize == uint64(fullSize) {
676                                                 state.Ctx.LogD("sp-file", sdsp, "finished")
677                                                 if len(state.queueTheir) > 1 {
678                                                         state.queueTheir = state.queueTheir[1:]
679                                                 } else {
680                                                         state.queueTheir = state.queueTheir[:0]
681                                                 }
682                                         } else {
683                                                 state.queueTheir[0].freq.Offset += uint64(len(buf))
684                                         }
685                                 } else {
686                                         state.Ctx.LogD("sp-file", sdsp, "queue disappeared")
687                                 }
688                                 state.Unlock()
689                         }
690                         state.Ctx.LogD(
691                                 "sp-xmit",
692                                 SdsAdd(sds, SDS{"size": len(payload)}),
693                                 "sending",
694                         )
695                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
696                         if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil {
697                                 state.Ctx.LogE("sp-xmit", sds, err, "")
698                                 break
699                         }
700                 }
701                 state.SetDead()
702                 state.wg.Done()
703         }()
704
705         state.wg.Add(1)
706         go func() {
707                 for {
708                         if state.NotAlive() {
709                                 break
710                         }
711                         state.Ctx.LogD("sp-recv", sds, "waiting for payload")
712                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
713                         payload, err := state.ReadSP(conn)
714                         if err != nil {
715                                 if err == io.EOF {
716                                         break
717                                 }
718                                 unmarshalErr := err.(*xdr.UnmarshalError)
719                                 netErr, ok := unmarshalErr.Err.(net.Error)
720                                 if ok && netErr.Timeout() {
721                                         continue
722                                 }
723                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
724                                         break
725                                 }
726                                 state.Ctx.LogE("sp-recv", sds, err, "")
727                                 break
728                         }
729                         state.Ctx.LogD(
730                                 "sp-recv",
731                                 SdsAdd(sds, SDS{"size": len(payload)}),
732                                 "got payload",
733                         )
734                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
735                         if err != nil {
736                                 state.Ctx.LogE("sp-recv", sds, err, "")
737                                 break
738                         }
739                         state.Ctx.LogD(
740                                 "sp-recv",
741                                 SdsAdd(sds, SDS{"size": len(payload)}),
742                                 "processing",
743                         )
744                         replies, err := state.ProcessSP(payload)
745                         if err != nil {
746                                 state.Ctx.LogE("sp-recv", sds, err, "")
747                                 break
748                         }
749                         state.wg.Add(1)
750                         go func() {
751                                 for _, reply := range replies {
752                                         state.Ctx.LogD(
753                                                 "sp-recv",
754                                                 SdsAdd(sds, SDS{"size": len(reply)}),
755                                                 "queuing reply",
756                                         )
757                                         state.payloads <- reply
758                                 }
759                                 state.wg.Done()
760                         }()
761                         if state.rxRate > 0 {
762                                 time.Sleep(time.Second / time.Duration(state.rxRate))
763                         }
764                 }
765                 state.SetDead()
766                 state.wg.Done()
767         }()
768
769         return nil
770 }
771
772 func (state *SPState) Wait() {
773         state.wg.Wait()
774         close(state.payloads)
775         state.dirUnlock()
776         state.Duration = time.Now().Sub(state.started)
777         state.RxSpeed = state.RxBytes
778         state.TxSpeed = state.TxBytes
779         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
780         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
781         if rxDuration > 0 {
782                 state.RxSpeed = state.RxBytes / rxDuration
783         }
784         if txDuration > 0 {
785                 state.TxSpeed = state.TxBytes / txDuration
786         }
787 }
788
789 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
790         sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)}
791         r := bytes.NewReader(payload)
792         var err error
793         var replies [][]byte
794         var infosGot bool
795         for r.Len() > 0 {
796                 state.Ctx.LogD("sp-process", sds, "unmarshaling header")
797                 var head SPHead
798                 if _, err = xdr.Unmarshal(r, &head); err != nil {
799                         state.Ctx.LogE("sp-process", sds, err, "")
800                         return nil, err
801                 }
802                 switch head.Type {
803                 case SPTypeInfo:
804                         infosGot = true
805                         sdsp := SdsAdd(sds, SDS{"type": "info"})
806                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
807                         var info SPInfo
808                         if _, err = xdr.Unmarshal(r, &info); err != nil {
809                                 state.Ctx.LogE("sp-process", sdsp, err, "")
810                                 return nil, err
811                         }
812                         sdsp = SdsAdd(sds, SDS{
813                                 "pkt":  ToBase32(info.Hash[:]),
814                                 "size": int64(info.Size),
815                                 "nice": int(info.Nice),
816                         })
817                         if !state.listOnly && info.Nice > state.Nice {
818                                 state.Ctx.LogD("sp-process", sdsp, "too nice")
819                                 continue
820                         }
821                         state.Ctx.LogD("sp-process", sdsp, "received")
822                         if !state.listOnly && state.xxOnly == TTx {
823                                 continue
824                         }
825                         state.Lock()
826                         state.infosTheir[*info.Hash] = &info
827                         state.Unlock()
828                         state.Ctx.LogD("sp-process", sdsp, "stating part")
829                         pktPath := filepath.Join(
830                                 state.Ctx.Spool,
831                                 state.Node.Id.String(),
832                                 string(TRx),
833                                 ToBase32(info.Hash[:]),
834                         )
835                         if _, err = os.Stat(pktPath); err == nil {
836                                 state.Ctx.LogI("sp-info", sdsp, "already done")
837                                 if !state.listOnly {
838                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
839                                 }
840                                 continue
841                         }
842                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
843                                 state.Ctx.LogI("sp-info", sdsp, "already seen")
844                                 if !state.listOnly {
845                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
846                                 }
847                                 continue
848                         }
849                         fi, err := os.Stat(pktPath + PartSuffix)
850                         var offset int64
851                         if err == nil {
852                                 offset = fi.Size()
853                         }
854                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
855                                 state.Ctx.LogI("sp-info", sdsp, "not enough space")
856                                 continue
857                         }
858                         state.Ctx.LogI(
859                                 "sp-info",
860                                 SdsAdd(sdsp, SDS{"offset": offset}),
861                                 "",
862                         )
863                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
864                                 replies = append(replies, MarshalSP(
865                                         SPTypeFreq,
866                                         SPFreq{info.Hash, uint64(offset)},
867                                 ))
868                         }
869                 case SPTypeFile:
870                         sdsp := SdsAdd(sds, SDS{"type": "file"})
871                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
872                         var file SPFile
873                         if _, err = xdr.Unmarshal(r, &file); err != nil {
874                                 state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "file"}), err, "")
875                                 return nil, err
876                         }
877                         sdsp["xx"] = string(TRx)
878                         sdsp["pkt"] = ToBase32(file.Hash[:])
879                         sdsp["size"] = len(file.Payload)
880                         dirToSync := filepath.Join(
881                                 state.Ctx.Spool,
882                                 state.Node.Id.String(),
883                                 string(TRx),
884                         )
885                         filePath := filepath.Join(dirToSync, ToBase32(file.Hash[:]))
886                         state.Ctx.LogD("sp-file", sdsp, "opening part")
887                         fd, err := os.OpenFile(
888                                 filePath+PartSuffix,
889                                 os.O_RDWR|os.O_CREATE,
890                                 os.FileMode(0666),
891                         )
892                         if err != nil {
893                                 state.Ctx.LogE("sp-file", sdsp, err, "")
894                                 return nil, err
895                         }
896                         state.Ctx.LogD(
897                                 "sp-file",
898                                 SdsAdd(sdsp, SDS{"offset": file.Offset}),
899                                 "seeking",
900                         )
901                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
902                                 state.Ctx.LogE("sp-file", sdsp, err, "")
903                                 fd.Close()
904                                 return nil, err
905                         }
906                         state.Ctx.LogD("sp-file", sdsp, "writing")
907                         _, err = fd.Write(file.Payload)
908                         if err != nil {
909                                 state.Ctx.LogE("sp-file", sdsp, err, "")
910                                 fd.Close()
911                                 return nil, err
912                         }
913                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
914                         sdsp["size"] = ourSize
915                         fullsize := int64(0)
916                         state.RLock()
917                         infoTheir, ok := state.infosTheir[*file.Hash]
918                         state.RUnlock()
919                         if ok {
920                                 fullsize = int64(infoTheir.Size)
921                         }
922                         sdsp["fullsize"] = fullsize
923                         if state.Ctx.ShowPrgrs {
924                                 Progress("Rx", sdsp)
925                         }
926                         if fullsize != ourSize {
927                                 fd.Close()
928                                 continue
929                         }
930                         spWorkersGroup.Wait()
931                         spWorkersGroup.Add(1)
932                         go func() {
933                                 if err := fd.Sync(); err != nil {
934                                         state.Ctx.LogE("sp-file", sdsp, err, "sync")
935                                         fd.Close()
936                                         return
937                                 }
938                                 state.wg.Add(1)
939                                 defer state.wg.Done()
940                                 fd.Seek(0, io.SeekStart)
941                                 state.Ctx.LogD("sp-file", sdsp, "checking")
942                                 gut, err := Check(fd, file.Hash[:], sdsp, state.Ctx.ShowPrgrs)
943                                 fd.Close()
944                                 if err != nil || !gut {
945                                         state.Ctx.LogE("sp-file", sdsp, errors.New("checksum mismatch"), "")
946                                         return
947                                 }
948                                 state.Ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "")
949                                 if err = os.Rename(filePath+PartSuffix, filePath); err != nil {
950                                         state.Ctx.LogE("sp-file", sdsp, err, "rename")
951                                         return
952                                 }
953                                 if err = DirSync(dirToSync); err != nil {
954                                         state.Ctx.LogE("sp-file", sdsp, err, "sync")
955                                         return
956                                 }
957                                 state.Lock()
958                                 delete(state.infosTheir, *file.Hash)
959                                 state.Unlock()
960                                 spWorkersGroup.Done()
961                                 state.wg.Add(1)
962                                 go func() {
963                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
964                                         state.wg.Done()
965                                 }()
966                         }()
967                 case SPTypeDone:
968                         sdsp := SdsAdd(sds, SDS{"type": "done"})
969                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
970                         var done SPDone
971                         if _, err = xdr.Unmarshal(r, &done); err != nil {
972                                 state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "done"}), err, "")
973                                 return nil, err
974                         }
975                         sdsp["pkt"] = ToBase32(done.Hash[:])
976                         state.Ctx.LogD("sp-done", sdsp, "removing")
977                         err := os.Remove(filepath.Join(
978                                 state.Ctx.Spool,
979                                 state.Node.Id.String(),
980                                 string(TTx),
981                                 ToBase32(done.Hash[:]),
982                         ))
983                         sdsp["xx"] = string(TTx)
984                         if err == nil {
985                                 state.Ctx.LogI("sp-done", sdsp, "")
986                         } else {
987                                 state.Ctx.LogE("sp-done", sdsp, err, "")
988                         }
989                 case SPTypeFreq:
990                         sdsp := SdsAdd(sds, SDS{"type": "freq"})
991                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
992                         var freq SPFreq
993                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
994                                 state.Ctx.LogE("sp-process", sdsp, err, "")
995                                 return nil, err
996                         }
997                         sdsp["pkt"] = ToBase32(freq.Hash[:])
998                         sdsp["offset"] = freq.Offset
999                         state.Ctx.LogD("sp-process", sdsp, "queueing")
1000                         nice, exists := state.infosOurSeen[*freq.Hash]
1001                         if exists {
1002                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1003                                         state.Lock()
1004                                         insertIdx := 0
1005                                         var freqWithNice *FreqWithNice
1006                                         for insertIdx, freqWithNice = range state.queueTheir {
1007                                                 if freqWithNice.nice > nice {
1008                                                         break
1009                                                 }
1010                                         }
1011                                         state.queueTheir = append(state.queueTheir, nil)
1012                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1013                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1014                                         state.Unlock()
1015                                 } else {
1016                                         state.Ctx.LogD("sp-process", sdsp, "skipping")
1017                                 }
1018                         } else {
1019                                 state.Ctx.LogD("sp-process", sdsp, "unknown")
1020                         }
1021                 case SPTypeHalt:
1022                         state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "halt"}), "")
1023                         state.Lock()
1024                         state.queueTheir = nil
1025                         state.Unlock()
1026                 default:
1027                         state.Ctx.LogE(
1028                                 "sp-process",
1029                                 SdsAdd(sds, SDS{"type": head.Type}),
1030                                 errors.New("unknown type"),
1031                                 "",
1032                         )
1033                         return nil, BadPktType
1034                 }
1035         }
1036         if infosGot {
1037                 var pkts int
1038                 var size uint64
1039                 state.RLock()
1040                 for _, info := range state.infosTheir {
1041                         pkts++
1042                         size += info.Size
1043                 }
1044                 state.RUnlock()
1045                 state.Ctx.LogI("sp-infos", SDS{
1046                         "xx":   string(TRx),
1047                         "node": state.Node.Id,
1048                         "pkts": pkts,
1049                         "size": int64(size),
1050                 }, "")
1051         }
1052         return payloadsSplit(replies), nil
1053 }