]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
Check aliveness every second
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2020 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "io"
25         "net"
26         "os"
27         "path/filepath"
28         "sort"
29         "sync"
30         "time"
31
32         xdr "github.com/davecgh/go-xdr/xdr2"
33         "github.com/flynn/noise"
34 )
35
36 const (
37         MaxSPSize       = 1<<16 - 256
38         PartSuffix      = ".part"
39         DefaultDeadline = 10
40
41         SPHeadOverhead = 4
42 )
43
44 var (
45         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
46
47         SPInfoOverhead    int
48         SPFreqOverhead    int
49         SPFileOverhead    int
50         SPHaltMarshalized []byte
51
52         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
53                 noise.DH25519,
54                 noise.CipherChaChaPoly,
55                 noise.HashBLAKE2b,
56         )
57
58         spWorkersGroup sync.WaitGroup
59 )
60
61 type SPType uint8
62
63 const (
64         SPTypeInfo SPType = iota
65         SPTypeFreq SPType = iota
66         SPTypeFile SPType = iota
67         SPTypeDone SPType = iota
68         SPTypeHalt SPType = iota
69 )
70
71 type SPHead struct {
72         Type SPType
73 }
74
75 type SPInfo struct {
76         Nice uint8
77         Size uint64
78         Hash *[32]byte
79 }
80
81 type SPFreq struct {
82         Hash   *[32]byte
83         Offset uint64
84 }
85
86 type SPFile struct {
87         Hash    *[32]byte
88         Offset  uint64
89         Payload []byte
90 }
91
92 type SPDone struct {
93         Hash *[32]byte
94 }
95
96 type SPRaw struct {
97         Magic   [8]byte
98         Payload []byte
99 }
100
101 type FreqWithNice struct {
102         freq *SPFreq
103         nice uint8
104 }
105
106 type ConnDeadlined interface {
107         io.ReadWriteCloser
108         SetReadDeadline(t time.Time) error
109         SetWriteDeadline(t time.Time) error
110 }
111
112 func init() {
113         var buf bytes.Buffer
114         spHead := SPHead{Type: SPTypeHalt}
115         if _, err := xdr.Marshal(&buf, spHead); err != nil {
116                 panic(err)
117         }
118         SPHaltMarshalized = make([]byte, SPHeadOverhead)
119         copy(SPHaltMarshalized, buf.Bytes())
120         buf.Reset()
121
122         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
123         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
124                 panic(err)
125         }
126         SPInfoOverhead = buf.Len()
127         buf.Reset()
128
129         spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
130         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
131                 panic(err)
132         }
133         SPFreqOverhead = buf.Len()
134         buf.Reset()
135
136         spFile := SPFile{Hash: new([32]byte), Offset: 123}
137         if _, err := xdr.Marshal(&buf, spFile); err != nil {
138                 panic(err)
139         }
140         SPFileOverhead = buf.Len()
141 }
142
143 func MarshalSP(typ SPType, sp interface{}) []byte {
144         var buf bytes.Buffer
145         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
146                 panic(err)
147         }
148         if _, err := xdr.Marshal(&buf, sp); err != nil {
149                 panic(err)
150         }
151         return buf.Bytes()
152 }
153
154 func payloadsSplit(payloads [][]byte) [][]byte {
155         var outbounds [][]byte
156         outbound := make([]byte, 0, MaxSPSize)
157         for i, payload := range payloads {
158                 outbound = append(outbound, payload...)
159                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
160                         outbounds = append(outbounds, outbound)
161                         outbound = make([]byte, 0, MaxSPSize)
162                 }
163         }
164         if len(outbound) > 0 {
165                 outbounds = append(outbounds, outbound)
166         }
167         return outbounds
168 }
169
170 type SPState struct {
171         Ctx            *Ctx
172         Node           *Node
173         Nice           uint8
174         onlineDeadline uint
175         maxOnlineTime  time.Duration
176         hs             *noise.HandshakeState
177         csOur          *noise.CipherState
178         csTheir        *noise.CipherState
179         payloads       chan []byte
180         infosTheir     map[[32]byte]*SPInfo
181         infosOurSeen   map[[32]byte]uint8
182         queueTheir     []*FreqWithNice
183         wg             sync.WaitGroup
184         RxBytes        int64
185         RxLastSeen     time.Time
186         TxBytes        int64
187         TxLastSeen     time.Time
188         started        time.Time
189         mustFinishAt   time.Time
190         Duration       time.Duration
191         RxSpeed        int64
192         TxSpeed        int64
193         rxLock         *os.File
194         txLock         *os.File
195         xxOnly         TRxTx
196         rxRate         int
197         txRate         int
198         isDead         chan struct{}
199         listOnly       bool
200         onlyPkts       map[[32]byte]bool
201         writeSPBuf     bytes.Buffer
202         sync.RWMutex
203 }
204
205 func (state *SPState) SetDead() {
206         state.Lock()
207         defer state.Unlock()
208         select {
209         case _, ok := <-state.isDead:
210                 if !ok {
211                         // Already closed channel, dead
212                         return
213                 }
214         default:
215         }
216         close(state.isDead)
217         go func() {
218                 for _ = range state.payloads {
219                 }
220         }()
221 }
222
223 func (state *SPState) NotAlive() bool {
224         select {
225         case _, ok := <-state.isDead:
226                 if !ok {
227                         return true
228                 }
229         default:
230         }
231         return false
232 }
233
234 func (state *SPState) dirUnlock() {
235         state.Ctx.UnlockDir(state.rxLock)
236         state.Ctx.UnlockDir(state.txLock)
237 }
238
239 func (state *SPState) WriteSP(dst io.Writer, payload []byte) error {
240         state.writeSPBuf.Reset()
241         n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
242                 Magic:   MagicNNCPLv1,
243                 Payload: payload,
244         })
245         if err != nil {
246                 return err
247         }
248         if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
249                 state.TxLastSeen = time.Now()
250                 state.TxBytes += int64(n)
251         }
252         return err
253 }
254
255 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
256         var sp SPRaw
257         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
258         if err != nil {
259                 ue := err.(*xdr.UnmarshalError)
260                 if ue.Err == io.EOF {
261                         return nil, ue.Err
262                 }
263                 return nil, err
264         }
265         state.RxLastSeen = time.Now()
266         state.RxBytes += int64(n)
267         if sp.Magic != MagicNNCPLv1 {
268                 return nil, BadMagic
269         }
270         return sp.Payload, nil
271 }
272
273 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
274         var infos []*SPInfo
275         var totalSize int64
276         for job := range ctx.Jobs(nodeId, TTx) {
277                 job.Fd.Close()
278                 if job.PktEnc.Nice > nice {
279                         continue
280                 }
281                 if _, known := (*seen)[*job.HshValue]; known {
282                         continue
283                 }
284                 totalSize += job.Size
285                 infos = append(infos, &SPInfo{
286                         Nice: job.PktEnc.Nice,
287                         Size: uint64(job.Size),
288                         Hash: job.HshValue,
289                 })
290                 (*seen)[*job.HshValue] = job.PktEnc.Nice
291         }
292         sort.Sort(ByNice(infos))
293         var payloads [][]byte
294         for _, info := range infos {
295                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
296                 ctx.LogD("sp-info-our", SDS{
297                         "node": nodeId,
298                         "name": ToBase32(info.Hash[:]),
299                         "size": info.Size,
300                 }, "")
301         }
302         if totalSize > 0 {
303                 ctx.LogI("sp-infos", SDS{
304                         "xx":   string(TTx),
305                         "node": nodeId,
306                         "pkts": len(payloads),
307                         "size": totalSize,
308                 }, "")
309         }
310         return payloadsSplit(payloads)
311 }
312
313 func (state *SPState) StartI(conn ConnDeadlined) error {
314         nodeId := state.Node.Id
315         err := state.Ctx.ensureRxDir(nodeId)
316         if err != nil {
317                 return err
318         }
319         var rxLock *os.File
320         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
321                 rxLock, err = state.Ctx.LockDir(nodeId, TRx)
322                 if err != nil {
323                         return err
324                 }
325         }
326         var txLock *os.File
327         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
328                 txLock, err = state.Ctx.LockDir(nodeId, TTx)
329                 if err != nil {
330                         return err
331                 }
332         }
333         started := time.Now()
334         conf := noise.Config{
335                 CipherSuite: NoiseCipherSuite,
336                 Pattern:     noise.HandshakeIK,
337                 Initiator:   true,
338                 StaticKeypair: noise.DHKey{
339                         Private: state.Ctx.Self.NoisePrv[:],
340                         Public:  state.Ctx.Self.NoisePub[:],
341                 },
342                 PeerStatic: state.Node.NoisePub[:],
343         }
344         hs, err := noise.NewHandshakeState(conf)
345         if err != nil {
346                 return err
347         }
348         state.hs = hs
349         state.payloads = make(chan []byte)
350         state.infosTheir = make(map[[32]byte]*SPInfo)
351         state.infosOurSeen = make(map[[32]byte]uint8)
352         state.started = started
353         state.rxLock = rxLock
354         state.txLock = txLock
355
356         var infosPayloads [][]byte
357         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
358                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
359         }
360         var firstPayload []byte
361         if len(infosPayloads) > 0 {
362                 firstPayload = infosPayloads[0]
363         }
364         // Pad first payload, to hide actual number of existing files
365         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
366                 firstPayload = append(firstPayload, SPHaltMarshalized...)
367         }
368
369         var buf []byte
370         var payload []byte
371         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
372         if err != nil {
373                 state.dirUnlock()
374                 return err
375         }
376         sds := SDS{"node": nodeId, "nice": int(state.Nice)}
377         state.Ctx.LogD("sp-start", sds, "sending first message")
378         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
379         if err = state.WriteSP(conn, buf); err != nil {
380                 state.Ctx.LogE("sp-start", sds, err, "")
381                 state.dirUnlock()
382                 return err
383         }
384         state.Ctx.LogD("sp-start", sds, "waiting for first message")
385         conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
386         if buf, err = state.ReadSP(conn); err != nil {
387                 state.Ctx.LogE("sp-start", sds, err, "")
388                 state.dirUnlock()
389                 return err
390         }
391         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
392         if err != nil {
393                 state.Ctx.LogE("sp-start", sds, err, "")
394                 state.dirUnlock()
395                 return err
396         }
397         state.Ctx.LogD("sp-start", sds, "starting workers")
398         err = state.StartWorkers(conn, infosPayloads, payload)
399         if err != nil {
400                 state.Ctx.LogE("sp-start", sds, err, "")
401                 state.dirUnlock()
402         }
403         return err
404 }
405
406 func (state *SPState) StartR(conn ConnDeadlined) error {
407         started := time.Now()
408         conf := noise.Config{
409                 CipherSuite: NoiseCipherSuite,
410                 Pattern:     noise.HandshakeIK,
411                 Initiator:   false,
412                 StaticKeypair: noise.DHKey{
413                         Private: state.Ctx.Self.NoisePrv[:],
414                         Public:  state.Ctx.Self.NoisePub[:],
415                 },
416         }
417         hs, err := noise.NewHandshakeState(conf)
418         if err != nil {
419                 return err
420         }
421         xxOnly := TRxTx("")
422         state.hs = hs
423         state.payloads = make(chan []byte)
424         state.infosOurSeen = make(map[[32]byte]uint8)
425         state.infosTheir = make(map[[32]byte]*SPInfo)
426         state.started = started
427         state.xxOnly = xxOnly
428         var buf []byte
429         var payload []byte
430         state.Ctx.LogD("sp-start", SDS{"nice": int(state.Nice)}, "waiting for first message")
431         conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
432         if buf, err = state.ReadSP(conn); err != nil {
433                 state.Ctx.LogE("sp-start", SDS{}, err, "")
434                 return err
435         }
436         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
437                 state.Ctx.LogE("sp-start", SDS{}, err, "")
438                 return err
439         }
440
441         var node *Node
442         for _, n := range state.Ctx.Neigh {
443                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
444                         node = n
445                         break
446                 }
447         }
448         if node == nil {
449                 peerId := ToBase32(state.hs.PeerStatic())
450                 state.Ctx.LogE("sp-start", SDS{"peer": peerId}, errors.New("unknown"), "")
451                 return errors.New("Unknown peer: " + peerId)
452         }
453         state.Node = node
454         state.rxRate = node.RxRate
455         state.txRate = node.TxRate
456         state.onlineDeadline = node.OnlineDeadline
457         state.maxOnlineTime = node.MaxOnlineTime
458         sds := SDS{"node": node.Id, "nice": int(state.Nice)}
459
460         if state.Ctx.ensureRxDir(node.Id); err != nil {
461                 return err
462         }
463         var rxLock *os.File
464         if xxOnly == "" || xxOnly == TRx {
465                 rxLock, err = state.Ctx.LockDir(node.Id, TRx)
466                 if err != nil {
467                         return err
468                 }
469         }
470         state.rxLock = rxLock
471         var txLock *os.File
472         if xxOnly == "" || xxOnly == TTx {
473                 txLock, err = state.Ctx.LockDir(node.Id, TTx)
474                 if err != nil {
475                         return err
476                 }
477         }
478         state.txLock = txLock
479
480         var infosPayloads [][]byte
481         if xxOnly == "" || xxOnly == TTx {
482                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
483         }
484         var firstPayload []byte
485         if len(infosPayloads) > 0 {
486                 firstPayload = infosPayloads[0]
487         }
488         // Pad first payload, to hide actual number of existing files
489         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
490                 firstPayload = append(firstPayload, SPHaltMarshalized...)
491         }
492
493         state.Ctx.LogD("sp-start", sds, "sending first message")
494         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
495         if err != nil {
496                 state.dirUnlock()
497                 return err
498         }
499         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
500         if err = state.WriteSP(conn, buf); err != nil {
501                 state.Ctx.LogE("sp-start", sds, err, "")
502                 state.dirUnlock()
503                 return err
504         }
505         state.Ctx.LogD("sp-start", sds, "starting workers")
506         err = state.StartWorkers(conn, infosPayloads, payload)
507         if err != nil {
508                 state.dirUnlock()
509         }
510         return err
511 }
512
513 func (state *SPState) StartWorkers(
514         conn ConnDeadlined,
515         infosPayloads [][]byte,
516         payload []byte,
517 ) error {
518         sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)}
519         state.isDead = make(chan struct{})
520         if state.maxOnlineTime > 0 {
521                 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
522         }
523
524         // Remaining handshake payload sending
525         if len(infosPayloads) > 1 {
526                 state.wg.Add(1)
527                 go func() {
528                         for _, payload := range infosPayloads[1:] {
529                                 state.Ctx.LogD(
530                                         "sp-work",
531                                         SdsAdd(sds, SDS{"size": len(payload)}),
532                                         "queuing remaining payload",
533                                 )
534                                 state.payloads <- payload
535                         }
536                         state.wg.Done()
537                 }()
538         }
539
540         // Processing of first payload and queueing its responses
541         state.Ctx.LogD(
542                 "sp-work",
543                 SdsAdd(sds, SDS{"size": len(payload)}),
544                 "processing first payload",
545         )
546         replies, err := state.ProcessSP(payload)
547         if err != nil {
548                 state.Ctx.LogE("sp-work", sds, err, "")
549                 return err
550         }
551         state.wg.Add(1)
552         go func() {
553                 for _, reply := range replies {
554                         state.Ctx.LogD(
555                                 "sp-work",
556                                 SdsAdd(sds, SDS{"size": len(reply)}),
557                                 "queuing reply",
558                         )
559                         state.payloads <- reply
560                 }
561                 state.wg.Done()
562         }()
563
564         // Deadline checker
565         state.wg.Add(1)
566         go func() {
567                 ticker := time.NewTicker(time.Second)
568                 defer ticker.Stop()
569                 defer state.wg.Done()
570                 for {
571                         select {
572                         case _, ok := <-state.isDead:
573                                 if !ok {
574                                         return
575                                 }
576                         case now := <-ticker.C:
577                                 if (uint(now.Sub(state.RxLastSeen).Seconds()) >= state.onlineDeadline &&
578                                         uint(now.Sub(state.TxLastSeen).Seconds()) >= state.onlineDeadline) ||
579                                         (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) {
580                                         state.SetDead()
581                                         conn.Close()
582                                         return
583                                 }
584                         }
585                 }
586         }()
587
588         // Spool checker and INFOs sender of appearing files
589         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
590                 state.wg.Add(1)
591                 go func() {
592                         ticker := time.NewTicker(time.Second)
593                         for {
594                                 select {
595                                 case _, ok := <-state.isDead:
596                                         if !ok {
597                                                 state.wg.Done()
598                                                 ticker.Stop()
599                                                 return
600                                         }
601                                 case <-ticker.C:
602                                         for _, payload := range state.Ctx.infosOur(
603                                                 state.Node.Id,
604                                                 state.Nice,
605                                                 &state.infosOurSeen,
606                                         ) {
607                                                 state.Ctx.LogD(
608                                                         "sp-work",
609                                                         SdsAdd(sds, SDS{"size": len(payload)}),
610                                                         "queuing new info",
611                                                 )
612                                                 state.payloads <- payload
613                                         }
614                                 }
615                         }
616                 }()
617         }
618
619         // Sender
620         state.wg.Add(1)
621         go func() {
622                 for {
623                         if state.NotAlive() {
624                                 break
625                         }
626                         var payload []byte
627                         select {
628                         case payload = <-state.payloads:
629                                 state.Ctx.LogD(
630                                         "sp-xmit",
631                                         SdsAdd(sds, SDS{"size": len(payload)}),
632                                         "got payload",
633                                 )
634                         default:
635                         }
636                         if payload == nil {
637                                 state.RLock()
638                                 if len(state.queueTheir) == 0 {
639                                         state.RUnlock()
640                                         time.Sleep(100 * time.Millisecond)
641                                         continue
642                                 }
643                                 freq := state.queueTheir[0].freq
644                                 state.RUnlock()
645
646                                 if state.txRate > 0 {
647                                         time.Sleep(time.Second / time.Duration(state.txRate))
648                                 }
649
650                                 sdsp := SdsAdd(sds, SDS{
651                                         "xx":   string(TTx),
652                                         "pkt":  ToBase32(freq.Hash[:]),
653                                         "size": int64(freq.Offset),
654                                 })
655                                 state.Ctx.LogD("sp-file", sdsp, "queueing")
656                                 fd, err := os.Open(filepath.Join(
657                                         state.Ctx.Spool,
658                                         state.Node.Id.String(),
659                                         string(TTx),
660                                         ToBase32(freq.Hash[:]),
661                                 ))
662                                 if err != nil {
663                                         state.Ctx.LogE("sp-file", sdsp, err, "")
664                                         break
665                                 }
666                                 fi, err := fd.Stat()
667                                 if err != nil {
668                                         state.Ctx.LogE("sp-file", sdsp, err, "")
669                                         break
670                                 }
671                                 fullSize := fi.Size()
672                                 var buf []byte
673                                 if freq.Offset < uint64(fullSize) {
674                                         state.Ctx.LogD("sp-file", sdsp, "seeking")
675                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
676                                                 state.Ctx.LogE("sp-file", sdsp, err, "")
677                                                 break
678                                         }
679                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
680                                         n, err := fd.Read(buf)
681                                         if err != nil {
682                                                 state.Ctx.LogE("sp-file", sdsp, err, "")
683                                                 break
684                                         }
685                                         buf = buf[:n]
686                                         state.Ctx.LogD("sp-file", SdsAdd(sdsp, SDS{"size": n}), "read")
687                                 }
688                                 fd.Close()
689                                 payload = MarshalSP(SPTypeFile, SPFile{
690                                         Hash:    freq.Hash,
691                                         Offset:  freq.Offset,
692                                         Payload: buf,
693                                 })
694                                 ourSize := freq.Offset + uint64(len(buf))
695                                 sdsp["size"] = int64(ourSize)
696                                 sdsp["fullsize"] = fullSize
697                                 if state.Ctx.ShowPrgrs {
698                                         Progress("Tx", sdsp)
699                                 }
700                                 state.Lock()
701                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
702                                         if ourSize == uint64(fullSize) {
703                                                 state.Ctx.LogD("sp-file", sdsp, "finished")
704                                                 if len(state.queueTheir) > 1 {
705                                                         state.queueTheir = state.queueTheir[1:]
706                                                 } else {
707                                                         state.queueTheir = state.queueTheir[:0]
708                                                 }
709                                         } else {
710                                                 state.queueTheir[0].freq.Offset += uint64(len(buf))
711                                         }
712                                 } else {
713                                         state.Ctx.LogD("sp-file", sdsp, "queue disappeared")
714                                 }
715                                 state.Unlock()
716                         }
717                         state.Ctx.LogD(
718                                 "sp-xmit",
719                                 SdsAdd(sds, SDS{"size": len(payload)}),
720                                 "sending",
721                         )
722                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
723                         if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil {
724                                 state.Ctx.LogE("sp-xmit", sds, err, "")
725                                 break
726                         }
727                 }
728                 state.SetDead()
729                 state.wg.Done()
730         }()
731
732         // Receiver
733         state.wg.Add(1)
734         go func() {
735                 for {
736                         if state.NotAlive() {
737                                 break
738                         }
739                         state.Ctx.LogD("sp-recv", sds, "waiting for payload")
740                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
741                         payload, err := state.ReadSP(conn)
742                         if err != nil {
743                                 if err == io.EOF {
744                                         break
745                                 }
746                                 unmarshalErr := err.(*xdr.UnmarshalError)
747                                 netErr, ok := unmarshalErr.Err.(net.Error)
748                                 if ok && netErr.Timeout() {
749                                         continue
750                                 }
751                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
752                                         break
753                                 }
754                                 state.Ctx.LogE("sp-recv", sds, err, "")
755                                 break
756                         }
757                         state.Ctx.LogD(
758                                 "sp-recv",
759                                 SdsAdd(sds, SDS{"size": len(payload)}),
760                                 "got payload",
761                         )
762                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
763                         if err != nil {
764                                 state.Ctx.LogE("sp-recv", sds, err, "")
765                                 break
766                         }
767                         state.Ctx.LogD(
768                                 "sp-recv",
769                                 SdsAdd(sds, SDS{"size": len(payload)}),
770                                 "processing",
771                         )
772                         replies, err := state.ProcessSP(payload)
773                         if err != nil {
774                                 state.Ctx.LogE("sp-recv", sds, err, "")
775                                 break
776                         }
777                         state.wg.Add(1)
778                         go func() {
779                                 for _, reply := range replies {
780                                         state.Ctx.LogD(
781                                                 "sp-recv",
782                                                 SdsAdd(sds, SDS{"size": len(reply)}),
783                                                 "queuing reply",
784                                         )
785                                         state.payloads <- reply
786                                 }
787                                 state.wg.Done()
788                         }()
789                         if state.rxRate > 0 {
790                                 time.Sleep(time.Second / time.Duration(state.rxRate))
791                         }
792                 }
793                 state.SetDead()
794                 state.wg.Done()
795         }()
796
797         return nil
798 }
799
800 func (state *SPState) Wait() {
801         state.wg.Wait()
802         close(state.payloads)
803         state.dirUnlock()
804         state.Duration = time.Now().Sub(state.started)
805         state.RxSpeed = state.RxBytes
806         state.TxSpeed = state.TxBytes
807         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
808         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
809         if rxDuration > 0 {
810                 state.RxSpeed = state.RxBytes / rxDuration
811         }
812         if txDuration > 0 {
813                 state.TxSpeed = state.TxBytes / txDuration
814         }
815 }
816
817 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
818         sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)}
819         r := bytes.NewReader(payload)
820         var err error
821         var replies [][]byte
822         var infosGot bool
823         for r.Len() > 0 {
824                 state.Ctx.LogD("sp-process", sds, "unmarshaling header")
825                 var head SPHead
826                 if _, err = xdr.Unmarshal(r, &head); err != nil {
827                         state.Ctx.LogE("sp-process", sds, err, "")
828                         return nil, err
829                 }
830                 switch head.Type {
831                 case SPTypeInfo:
832                         infosGot = true
833                         sdsp := SdsAdd(sds, SDS{"type": "info"})
834                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
835                         var info SPInfo
836                         if _, err = xdr.Unmarshal(r, &info); err != nil {
837                                 state.Ctx.LogE("sp-process", sdsp, err, "")
838                                 return nil, err
839                         }
840                         sdsp = SdsAdd(sds, SDS{
841                                 "pkt":  ToBase32(info.Hash[:]),
842                                 "size": int64(info.Size),
843                                 "nice": int(info.Nice),
844                         })
845                         if !state.listOnly && info.Nice > state.Nice {
846                                 state.Ctx.LogD("sp-process", sdsp, "too nice")
847                                 continue
848                         }
849                         state.Ctx.LogD("sp-process", sdsp, "received")
850                         if !state.listOnly && state.xxOnly == TTx {
851                                 continue
852                         }
853                         state.Lock()
854                         state.infosTheir[*info.Hash] = &info
855                         state.Unlock()
856                         state.Ctx.LogD("sp-process", sdsp, "stating part")
857                         pktPath := filepath.Join(
858                                 state.Ctx.Spool,
859                                 state.Node.Id.String(),
860                                 string(TRx),
861                                 ToBase32(info.Hash[:]),
862                         )
863                         if _, err = os.Stat(pktPath); err == nil {
864                                 state.Ctx.LogI("sp-info", sdsp, "already done")
865                                 if !state.listOnly {
866                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
867                                 }
868                                 continue
869                         }
870                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
871                                 state.Ctx.LogI("sp-info", sdsp, "already seen")
872                                 if !state.listOnly {
873                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
874                                 }
875                                 continue
876                         }
877                         fi, err := os.Stat(pktPath + PartSuffix)
878                         var offset int64
879                         if err == nil {
880                                 offset = fi.Size()
881                         }
882                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
883                                 state.Ctx.LogI("sp-info", sdsp, "not enough space")
884                                 continue
885                         }
886                         state.Ctx.LogI(
887                                 "sp-info",
888                                 SdsAdd(sdsp, SDS{"offset": offset}),
889                                 "",
890                         )
891                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
892                                 replies = append(replies, MarshalSP(
893                                         SPTypeFreq,
894                                         SPFreq{info.Hash, uint64(offset)},
895                                 ))
896                         }
897                 case SPTypeFile:
898                         sdsp := SdsAdd(sds, SDS{"type": "file"})
899                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
900                         var file SPFile
901                         if _, err = xdr.Unmarshal(r, &file); err != nil {
902                                 state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "file"}), err, "")
903                                 return nil, err
904                         }
905                         sdsp["xx"] = string(TRx)
906                         sdsp["pkt"] = ToBase32(file.Hash[:])
907                         sdsp["size"] = len(file.Payload)
908                         dirToSync := filepath.Join(
909                                 state.Ctx.Spool,
910                                 state.Node.Id.String(),
911                                 string(TRx),
912                         )
913                         filePath := filepath.Join(dirToSync, ToBase32(file.Hash[:]))
914                         state.Ctx.LogD("sp-file", sdsp, "opening part")
915                         fd, err := os.OpenFile(
916                                 filePath+PartSuffix,
917                                 os.O_RDWR|os.O_CREATE,
918                                 os.FileMode(0666),
919                         )
920                         if err != nil {
921                                 state.Ctx.LogE("sp-file", sdsp, err, "")
922                                 return nil, err
923                         }
924                         state.Ctx.LogD(
925                                 "sp-file",
926                                 SdsAdd(sdsp, SDS{"offset": file.Offset}),
927                                 "seeking",
928                         )
929                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
930                                 state.Ctx.LogE("sp-file", sdsp, err, "")
931                                 fd.Close()
932                                 return nil, err
933                         }
934                         state.Ctx.LogD("sp-file", sdsp, "writing")
935                         _, err = fd.Write(file.Payload)
936                         if err != nil {
937                                 state.Ctx.LogE("sp-file", sdsp, err, "")
938                                 fd.Close()
939                                 return nil, err
940                         }
941                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
942                         sdsp["size"] = ourSize
943                         fullsize := int64(0)
944                         state.RLock()
945                         infoTheir, ok := state.infosTheir[*file.Hash]
946                         state.RUnlock()
947                         if ok {
948                                 fullsize = int64(infoTheir.Size)
949                         }
950                         sdsp["fullsize"] = fullsize
951                         if state.Ctx.ShowPrgrs {
952                                 Progress("Rx", sdsp)
953                         }
954                         if fullsize != ourSize {
955                                 fd.Close()
956                                 continue
957                         }
958                         spWorkersGroup.Wait()
959                         spWorkersGroup.Add(1)
960                         go func() {
961                                 if err := fd.Sync(); err != nil {
962                                         state.Ctx.LogE("sp-file", sdsp, err, "sync")
963                                         fd.Close()
964                                         return
965                                 }
966                                 state.wg.Add(1)
967                                 defer state.wg.Done()
968                                 fd.Seek(0, io.SeekStart)
969                                 state.Ctx.LogD("sp-file", sdsp, "checking")
970                                 gut, err := Check(fd, file.Hash[:], sdsp, state.Ctx.ShowPrgrs)
971                                 fd.Close()
972                                 if err != nil || !gut {
973                                         state.Ctx.LogE("sp-file", sdsp, errors.New("checksum mismatch"), "")
974                                         return
975                                 }
976                                 state.Ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "")
977                                 if err = os.Rename(filePath+PartSuffix, filePath); err != nil {
978                                         state.Ctx.LogE("sp-file", sdsp, err, "rename")
979                                         return
980                                 }
981                                 if err = DirSync(dirToSync); err != nil {
982                                         state.Ctx.LogE("sp-file", sdsp, err, "sync")
983                                         return
984                                 }
985                                 state.Lock()
986                                 delete(state.infosTheir, *file.Hash)
987                                 state.Unlock()
988                                 spWorkersGroup.Done()
989                                 state.wg.Add(1)
990                                 go func() {
991                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
992                                         state.wg.Done()
993                                 }()
994                         }()
995                 case SPTypeDone:
996                         sdsp := SdsAdd(sds, SDS{"type": "done"})
997                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
998                         var done SPDone
999                         if _, err = xdr.Unmarshal(r, &done); err != nil {
1000                                 state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "done"}), err, "")
1001                                 return nil, err
1002                         }
1003                         sdsp["pkt"] = ToBase32(done.Hash[:])
1004                         state.Ctx.LogD("sp-done", sdsp, "removing")
1005                         err := os.Remove(filepath.Join(
1006                                 state.Ctx.Spool,
1007                                 state.Node.Id.String(),
1008                                 string(TTx),
1009                                 ToBase32(done.Hash[:]),
1010                         ))
1011                         sdsp["xx"] = string(TTx)
1012                         if err == nil {
1013                                 state.Ctx.LogI("sp-done", sdsp, "")
1014                         } else {
1015                                 state.Ctx.LogE("sp-done", sdsp, err, "")
1016                         }
1017                 case SPTypeFreq:
1018                         sdsp := SdsAdd(sds, SDS{"type": "freq"})
1019                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
1020                         var freq SPFreq
1021                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
1022                                 state.Ctx.LogE("sp-process", sdsp, err, "")
1023                                 return nil, err
1024                         }
1025                         sdsp["pkt"] = ToBase32(freq.Hash[:])
1026                         sdsp["offset"] = freq.Offset
1027                         state.Ctx.LogD("sp-process", sdsp, "queueing")
1028                         nice, exists := state.infosOurSeen[*freq.Hash]
1029                         if exists {
1030                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1031                                         state.Lock()
1032                                         insertIdx := 0
1033                                         var freqWithNice *FreqWithNice
1034                                         for insertIdx, freqWithNice = range state.queueTheir {
1035                                                 if freqWithNice.nice > nice {
1036                                                         break
1037                                                 }
1038                                         }
1039                                         state.queueTheir = append(state.queueTheir, nil)
1040                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1041                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1042                                         state.Unlock()
1043                                 } else {
1044                                         state.Ctx.LogD("sp-process", sdsp, "skipping")
1045                                 }
1046                         } else {
1047                                 state.Ctx.LogD("sp-process", sdsp, "unknown")
1048                         }
1049                 case SPTypeHalt:
1050                         state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "halt"}), "")
1051                         state.Lock()
1052                         state.queueTheir = nil
1053                         state.Unlock()
1054                 default:
1055                         state.Ctx.LogE(
1056                                 "sp-process",
1057                                 SdsAdd(sds, SDS{"type": head.Type}),
1058                                 errors.New("unknown type"),
1059                                 "",
1060                         )
1061                         return nil, BadPktType
1062                 }
1063         }
1064         if infosGot {
1065                 var pkts int
1066                 var size uint64
1067                 state.RLock()
1068                 for _, info := range state.infosTheir {
1069                         pkts++
1070                         size += info.Size
1071                 }
1072                 state.RUnlock()
1073                 state.Ctx.LogI("sp-infos", SDS{
1074                         "xx":   string(TRx),
1075                         "node": state.Node.Id,
1076                         "pkts": pkts,
1077                         "size": int64(size),
1078                 }, "")
1079         }
1080         return payloadsSplit(replies), nil
1081 }