]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
PINGs
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2020 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "io"
25         "net"
26         "os"
27         "path/filepath"
28         "sort"
29         "sync"
30         "time"
31
32         xdr "github.com/davecgh/go-xdr/xdr2"
33         "github.com/flynn/noise"
34 )
35
36 const (
37         MaxSPSize      = 1<<16 - 256
38         PartSuffix     = ".part"
39         SPHeadOverhead = 4
40 )
41
42 var (
43         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
44
45         SPInfoOverhead    int
46         SPFreqOverhead    int
47         SPFileOverhead    int
48         SPHaltMarshalized []byte
49         SPPingMarshalized []byte
50
51         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
52                 noise.DH25519,
53                 noise.CipherChaChaPoly,
54                 noise.HashBLAKE2b,
55         )
56
57         DefaultDeadline = 10 * time.Second
58         PingTimeout     = time.Minute
59
60         spWorkersGroup sync.WaitGroup
61 )
62
63 type SPType uint8
64
65 const (
66         SPTypeInfo SPType = iota
67         SPTypeFreq SPType = iota
68         SPTypeFile SPType = iota
69         SPTypeDone SPType = iota
70         SPTypeHalt SPType = iota
71         SPTypePing SPType = iota
72 )
73
74 type SPHead struct {
75         Type SPType
76 }
77
78 type SPInfo struct {
79         Nice uint8
80         Size uint64
81         Hash *[32]byte
82 }
83
84 type SPFreq struct {
85         Hash   *[32]byte
86         Offset uint64
87 }
88
89 type SPFile struct {
90         Hash    *[32]byte
91         Offset  uint64
92         Payload []byte
93 }
94
95 type SPDone struct {
96         Hash *[32]byte
97 }
98
99 type SPRaw struct {
100         Magic   [8]byte
101         Payload []byte
102 }
103
104 type FreqWithNice struct {
105         freq *SPFreq
106         nice uint8
107 }
108
109 type ConnDeadlined interface {
110         io.ReadWriteCloser
111         SetReadDeadline(t time.Time) error
112         SetWriteDeadline(t time.Time) error
113 }
114
115 func init() {
116         var buf bytes.Buffer
117         spHead := SPHead{Type: SPTypeHalt}
118         if _, err := xdr.Marshal(&buf, spHead); err != nil {
119                 panic(err)
120         }
121         SPHaltMarshalized = make([]byte, SPHeadOverhead)
122         copy(SPHaltMarshalized, buf.Bytes())
123         buf.Reset()
124
125         spHead = SPHead{Type: SPTypePing}
126         if _, err := xdr.Marshal(&buf, spHead); err != nil {
127                 panic(err)
128         }
129         SPPingMarshalized = make([]byte, SPHeadOverhead)
130         copy(SPPingMarshalized, buf.Bytes())
131         buf.Reset()
132
133         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
134         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
135                 panic(err)
136         }
137         SPInfoOverhead = buf.Len()
138         buf.Reset()
139
140         spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
141         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
142                 panic(err)
143         }
144         SPFreqOverhead = buf.Len()
145         buf.Reset()
146
147         spFile := SPFile{Hash: new([32]byte), Offset: 123}
148         if _, err := xdr.Marshal(&buf, spFile); err != nil {
149                 panic(err)
150         }
151         SPFileOverhead = buf.Len()
152 }
153
154 func MarshalSP(typ SPType, sp interface{}) []byte {
155         var buf bytes.Buffer
156         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
157                 panic(err)
158         }
159         if _, err := xdr.Marshal(&buf, sp); err != nil {
160                 panic(err)
161         }
162         return buf.Bytes()
163 }
164
165 func payloadsSplit(payloads [][]byte) [][]byte {
166         var outbounds [][]byte
167         outbound := make([]byte, 0, MaxSPSize)
168         for i, payload := range payloads {
169                 outbound = append(outbound, payload...)
170                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
171                         outbounds = append(outbounds, outbound)
172                         outbound = make([]byte, 0, MaxSPSize)
173                 }
174         }
175         if len(outbound) > 0 {
176                 outbounds = append(outbounds, outbound)
177         }
178         return outbounds
179 }
180
181 type SPState struct {
182         Ctx            *Ctx
183         Node           *Node
184         Nice           uint8
185         onlineDeadline time.Duration
186         maxOnlineTime  time.Duration
187         hs             *noise.HandshakeState
188         csOur          *noise.CipherState
189         csTheir        *noise.CipherState
190         payloads       chan []byte
191         pings          chan struct{}
192         infosTheir     map[[32]byte]*SPInfo
193         infosOurSeen   map[[32]byte]uint8
194         queueTheir     []*FreqWithNice
195         wg             sync.WaitGroup
196         RxBytes        int64
197         RxLastSeen     time.Time
198         RxLastNonPing  time.Time
199         TxBytes        int64
200         TxLastSeen     time.Time
201         TxLastNonPing  time.Time
202         started        time.Time
203         mustFinishAt   time.Time
204         Duration       time.Duration
205         RxSpeed        int64
206         TxSpeed        int64
207         rxLock         *os.File
208         txLock         *os.File
209         xxOnly         TRxTx
210         rxRate         int
211         txRate         int
212         isDead         chan struct{}
213         listOnly       bool
214         onlyPkts       map[[32]byte]bool
215         writeSPBuf     bytes.Buffer
216         sync.RWMutex
217 }
218
219 func (state *SPState) SetDead() {
220         state.Lock()
221         defer state.Unlock()
222         select {
223         case <-state.isDead:
224                 // Already closed channel, dead
225                 return
226         default:
227         }
228         close(state.isDead)
229         go func() {
230                 for _ = range state.payloads {
231                 }
232         }()
233         go func() {
234                 for _ = range state.pings {
235                 }
236         }()
237 }
238
239 func (state *SPState) NotAlive() bool {
240         select {
241         case <-state.isDead:
242                 return true
243         default:
244         }
245         return false
246 }
247
248 func (state *SPState) dirUnlock() {
249         state.Ctx.UnlockDir(state.rxLock)
250         state.Ctx.UnlockDir(state.txLock)
251 }
252
253 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
254         state.writeSPBuf.Reset()
255         n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
256                 Magic:   MagicNNCPLv1,
257                 Payload: payload,
258         })
259         if err != nil {
260                 return err
261         }
262         if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
263                 state.TxLastSeen = time.Now()
264                 state.TxBytes += int64(n)
265                 if !ping {
266                         state.TxLastNonPing = state.TxLastSeen
267                 }
268         }
269         return err
270 }
271
272 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
273         var sp SPRaw
274         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
275         if err != nil {
276                 ue := err.(*xdr.UnmarshalError)
277                 if ue.Err == io.EOF {
278                         return nil, ue.Err
279                 }
280                 return nil, err
281         }
282         state.RxLastSeen = time.Now()
283         state.RxBytes += int64(n)
284         if sp.Magic != MagicNNCPLv1 {
285                 return nil, BadMagic
286         }
287         return sp.Payload, nil
288 }
289
290 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
291         var infos []*SPInfo
292         var totalSize int64
293         for job := range ctx.Jobs(nodeId, TTx) {
294                 job.Fd.Close()
295                 if job.PktEnc.Nice > nice {
296                         continue
297                 }
298                 if _, known := (*seen)[*job.HshValue]; known {
299                         continue
300                 }
301                 totalSize += job.Size
302                 infos = append(infos, &SPInfo{
303                         Nice: job.PktEnc.Nice,
304                         Size: uint64(job.Size),
305                         Hash: job.HshValue,
306                 })
307                 (*seen)[*job.HshValue] = job.PktEnc.Nice
308         }
309         sort.Sort(ByNice(infos))
310         var payloads [][]byte
311         for _, info := range infos {
312                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
313                 ctx.LogD("sp-info-our", SDS{
314                         "node": nodeId,
315                         "name": ToBase32(info.Hash[:]),
316                         "size": info.Size,
317                 }, "")
318         }
319         if totalSize > 0 {
320                 ctx.LogI("sp-infos", SDS{
321                         "xx":   string(TTx),
322                         "node": nodeId,
323                         "pkts": len(payloads),
324                         "size": totalSize,
325                 }, "")
326         }
327         return payloadsSplit(payloads)
328 }
329
330 func (state *SPState) StartI(conn ConnDeadlined) error {
331         nodeId := state.Node.Id
332         err := state.Ctx.ensureRxDir(nodeId)
333         if err != nil {
334                 return err
335         }
336         var rxLock *os.File
337         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
338                 rxLock, err = state.Ctx.LockDir(nodeId, TRx)
339                 if err != nil {
340                         return err
341                 }
342         }
343         var txLock *os.File
344         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
345                 txLock, err = state.Ctx.LockDir(nodeId, TTx)
346                 if err != nil {
347                         return err
348                 }
349         }
350         started := time.Now()
351         conf := noise.Config{
352                 CipherSuite: NoiseCipherSuite,
353                 Pattern:     noise.HandshakeIK,
354                 Initiator:   true,
355                 StaticKeypair: noise.DHKey{
356                         Private: state.Ctx.Self.NoisePrv[:],
357                         Public:  state.Ctx.Self.NoisePub[:],
358                 },
359                 PeerStatic: state.Node.NoisePub[:],
360         }
361         hs, err := noise.NewHandshakeState(conf)
362         if err != nil {
363                 return err
364         }
365         state.hs = hs
366         state.payloads = make(chan []byte)
367         state.pings = make(chan struct{})
368         state.infosTheir = make(map[[32]byte]*SPInfo)
369         state.infosOurSeen = make(map[[32]byte]uint8)
370         state.started = started
371         state.rxLock = rxLock
372         state.txLock = txLock
373
374         var infosPayloads [][]byte
375         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
376                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
377         }
378         var firstPayload []byte
379         if len(infosPayloads) > 0 {
380                 firstPayload = infosPayloads[0]
381         }
382         // Pad first payload, to hide actual number of existing files
383         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
384                 firstPayload = append(firstPayload, SPHaltMarshalized...)
385         }
386
387         var buf []byte
388         var payload []byte
389         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
390         if err != nil {
391                 state.dirUnlock()
392                 return err
393         }
394         sds := SDS{"node": nodeId, "nice": int(state.Nice)}
395         state.Ctx.LogD("sp-start", sds, "sending first message")
396         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
397         if err = state.WriteSP(conn, buf, false); err != nil {
398                 state.Ctx.LogE("sp-start", sds, err, "")
399                 state.dirUnlock()
400                 return err
401         }
402         state.Ctx.LogD("sp-start", sds, "waiting for first message")
403         conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
404         if buf, err = state.ReadSP(conn); err != nil {
405                 state.Ctx.LogE("sp-start", sds, err, "")
406                 state.dirUnlock()
407                 return err
408         }
409         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
410         if err != nil {
411                 state.Ctx.LogE("sp-start", sds, err, "")
412                 state.dirUnlock()
413                 return err
414         }
415         state.Ctx.LogD("sp-start", sds, "starting workers")
416         err = state.StartWorkers(conn, infosPayloads, payload)
417         if err != nil {
418                 state.Ctx.LogE("sp-start", sds, err, "")
419                 state.dirUnlock()
420         }
421         return err
422 }
423
424 func (state *SPState) StartR(conn ConnDeadlined) error {
425         started := time.Now()
426         conf := noise.Config{
427                 CipherSuite: NoiseCipherSuite,
428                 Pattern:     noise.HandshakeIK,
429                 Initiator:   false,
430                 StaticKeypair: noise.DHKey{
431                         Private: state.Ctx.Self.NoisePrv[:],
432                         Public:  state.Ctx.Self.NoisePub[:],
433                 },
434         }
435         hs, err := noise.NewHandshakeState(conf)
436         if err != nil {
437                 return err
438         }
439         xxOnly := TRxTx("")
440         state.hs = hs
441         state.payloads = make(chan []byte)
442         state.pings = make(chan struct{})
443         state.infosOurSeen = make(map[[32]byte]uint8)
444         state.infosTheir = make(map[[32]byte]*SPInfo)
445         state.started = started
446         state.xxOnly = xxOnly
447         var buf []byte
448         var payload []byte
449         state.Ctx.LogD("sp-start", SDS{"nice": int(state.Nice)}, "waiting for first message")
450         conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
451         if buf, err = state.ReadSP(conn); err != nil {
452                 state.Ctx.LogE("sp-start", SDS{}, err, "")
453                 return err
454         }
455         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
456                 state.Ctx.LogE("sp-start", SDS{}, err, "")
457                 return err
458         }
459
460         var node *Node
461         for _, n := range state.Ctx.Neigh {
462                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
463                         node = n
464                         break
465                 }
466         }
467         if node == nil {
468                 peerId := ToBase32(state.hs.PeerStatic())
469                 state.Ctx.LogE("sp-start", SDS{"peer": peerId}, errors.New("unknown"), "")
470                 return errors.New("Unknown peer: " + peerId)
471         }
472         state.Node = node
473         state.rxRate = node.RxRate
474         state.txRate = node.TxRate
475         state.onlineDeadline = node.OnlineDeadline
476         state.maxOnlineTime = node.MaxOnlineTime
477         sds := SDS{"node": node.Id, "nice": int(state.Nice)}
478
479         if state.Ctx.ensureRxDir(node.Id); err != nil {
480                 return err
481         }
482         var rxLock *os.File
483         if xxOnly == "" || xxOnly == TRx {
484                 rxLock, err = state.Ctx.LockDir(node.Id, TRx)
485                 if err != nil {
486                         return err
487                 }
488         }
489         state.rxLock = rxLock
490         var txLock *os.File
491         if xxOnly == "" || xxOnly == TTx {
492                 txLock, err = state.Ctx.LockDir(node.Id, TTx)
493                 if err != nil {
494                         return err
495                 }
496         }
497         state.txLock = txLock
498
499         var infosPayloads [][]byte
500         if xxOnly == "" || xxOnly == TTx {
501                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
502         }
503         var firstPayload []byte
504         if len(infosPayloads) > 0 {
505                 firstPayload = infosPayloads[0]
506         }
507         // Pad first payload, to hide actual number of existing files
508         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
509                 firstPayload = append(firstPayload, SPHaltMarshalized...)
510         }
511
512         state.Ctx.LogD("sp-start", sds, "sending first message")
513         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
514         if err != nil {
515                 state.dirUnlock()
516                 return err
517         }
518         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
519         if err = state.WriteSP(conn, buf, false); err != nil {
520                 state.Ctx.LogE("sp-start", sds, err, "")
521                 state.dirUnlock()
522                 return err
523         }
524         state.Ctx.LogD("sp-start", sds, "starting workers")
525         err = state.StartWorkers(conn, infosPayloads, payload)
526         if err != nil {
527                 state.dirUnlock()
528         }
529         return err
530 }
531
532 func (state *SPState) StartWorkers(
533         conn ConnDeadlined,
534         infosPayloads [][]byte,
535         payload []byte,
536 ) error {
537         sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)}
538         state.isDead = make(chan struct{})
539         if state.maxOnlineTime > 0 {
540                 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
541         }
542
543         // Remaining handshake payload sending
544         if len(infosPayloads) > 1 {
545                 state.wg.Add(1)
546                 go func() {
547                         for _, payload := range infosPayloads[1:] {
548                                 state.Ctx.LogD(
549                                         "sp-work",
550                                         SdsAdd(sds, SDS{"size": len(payload)}),
551                                         "queuing remaining payload",
552                                 )
553                                 state.payloads <- payload
554                         }
555                         state.wg.Done()
556                 }()
557         }
558
559         // Processing of first payload and queueing its responses
560         state.Ctx.LogD(
561                 "sp-work",
562                 SdsAdd(sds, SDS{"size": len(payload)}),
563                 "processing first payload",
564         )
565         replies, err := state.ProcessSP(payload)
566         if err != nil {
567                 state.Ctx.LogE("sp-work", sds, err, "")
568                 return err
569         }
570         state.wg.Add(1)
571         go func() {
572                 for _, reply := range replies {
573                         state.Ctx.LogD(
574                                 "sp-work",
575                                 SdsAdd(sds, SDS{"size": len(reply)}),
576                                 "queuing reply",
577                         )
578                         state.payloads <- reply
579                 }
580                 state.wg.Done()
581         }()
582
583         // Periodic jobs
584         state.wg.Add(1)
585         go func() {
586                 deadlineTicker := time.NewTicker(time.Second)
587                 pingTicker := time.NewTicker(PingTimeout)
588                 for {
589                         select {
590                         case <-state.isDead:
591                                 state.wg.Done()
592                                 deadlineTicker.Stop()
593                                 pingTicker.Stop()
594                                 return
595                         case now := <-deadlineTicker.C:
596                                 if (now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
597                                         now.Sub(state.TxLastNonPing) >= state.onlineDeadline) ||
598                                         (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) ||
599                                         (now.Sub(state.RxLastSeen) >= 2*PingTimeout) {
600                                         state.SetDead()
601                                         conn.Close()
602                                 }
603                         case now := <-pingTicker.C:
604                                 if now.After(state.TxLastSeen.Add(PingTimeout)) {
605                                         state.wg.Add(1)
606                                         go func() {
607                                                 state.pings <- struct{}{}
608                                                 state.wg.Done()
609                                                 state.Ctx.LogD("HERE", SDS{}, "PING GOROUTINE QUIT")
610                                         }()
611                                 }
612                         }
613                 }
614         }()
615
616         // Spool checker and INFOs sender of appearing files
617         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
618                 state.wg.Add(1)
619                 go func() {
620                         ticker := time.NewTicker(time.Second)
621                         for {
622                                 select {
623                                 case <-state.isDead:
624                                         state.wg.Done()
625                                         ticker.Stop()
626                                         return
627                                 case <-ticker.C:
628                                         for _, payload := range state.Ctx.infosOur(
629                                                 state.Node.Id,
630                                                 state.Nice,
631                                                 &state.infosOurSeen,
632                                         ) {
633                                                 state.Ctx.LogD(
634                                                         "sp-work",
635                                                         SdsAdd(sds, SDS{"size": len(payload)}),
636                                                         "queuing new info",
637                                                 )
638                                                 state.payloads <- payload
639                                         }
640                                 }
641                         }
642                 }()
643         }
644
645         // Sender
646         state.wg.Add(1)
647         go func() {
648                 defer conn.Close()
649                 defer state.SetDead()
650                 defer state.wg.Done()
651                 for {
652                         if state.NotAlive() {
653                                 return
654                         }
655                         var payload []byte
656                         var ping bool
657                         select {
658                         case <-state.pings:
659                                 state.Ctx.LogD("sp-xmit", sds, "got ping")
660                                 payload = SPPingMarshalized
661                                 ping = true
662                         case payload = <-state.payloads:
663                                 state.Ctx.LogD(
664                                         "sp-xmit",
665                                         SdsAdd(sds, SDS{"size": len(payload)}),
666                                         "got payload",
667                                 )
668                         default:
669                                 state.RLock()
670                                 if len(state.queueTheir) == 0 {
671                                         state.RUnlock()
672                                         time.Sleep(100 * time.Millisecond)
673                                         continue
674                                 }
675                                 freq := state.queueTheir[0].freq
676                                 state.RUnlock()
677                                 if state.txRate > 0 {
678                                         time.Sleep(time.Second / time.Duration(state.txRate))
679                                 }
680                                 sdsp := SdsAdd(sds, SDS{
681                                         "xx":   string(TTx),
682                                         "pkt":  ToBase32(freq.Hash[:]),
683                                         "size": int64(freq.Offset),
684                                 })
685                                 state.Ctx.LogD("sp-file", sdsp, "queueing")
686                                 fd, err := os.Open(filepath.Join(
687                                         state.Ctx.Spool,
688                                         state.Node.Id.String(),
689                                         string(TTx),
690                                         ToBase32(freq.Hash[:]),
691                                 ))
692                                 if err != nil {
693                                         state.Ctx.LogE("sp-file", sdsp, err, "")
694                                         return
695                                 }
696                                 fi, err := fd.Stat()
697                                 if err != nil {
698                                         state.Ctx.LogE("sp-file", sdsp, err, "")
699                                         return
700                                 }
701                                 fullSize := fi.Size()
702                                 var buf []byte
703                                 if freq.Offset < uint64(fullSize) {
704                                         state.Ctx.LogD("sp-file", sdsp, "seeking")
705                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
706                                                 state.Ctx.LogE("sp-file", sdsp, err, "")
707                                                 return
708                                         }
709                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
710                                         n, err := fd.Read(buf)
711                                         if err != nil {
712                                                 state.Ctx.LogE("sp-file", sdsp, err, "")
713                                                 return
714                                         }
715                                         buf = buf[:n]
716                                         state.Ctx.LogD("sp-file", SdsAdd(sdsp, SDS{"size": n}), "read")
717                                 }
718                                 fd.Close()
719                                 payload = MarshalSP(SPTypeFile, SPFile{
720                                         Hash:    freq.Hash,
721                                         Offset:  freq.Offset,
722                                         Payload: buf,
723                                 })
724                                 ourSize := freq.Offset + uint64(len(buf))
725                                 sdsp["size"] = int64(ourSize)
726                                 sdsp["fullsize"] = fullSize
727                                 if state.Ctx.ShowPrgrs {
728                                         Progress("Tx", sdsp)
729                                 }
730                                 state.Lock()
731                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
732                                         if ourSize == uint64(fullSize) {
733                                                 state.Ctx.LogD("sp-file", sdsp, "finished")
734                                                 if len(state.queueTheir) > 1 {
735                                                         state.queueTheir = state.queueTheir[1:]
736                                                 } else {
737                                                         state.queueTheir = state.queueTheir[:0]
738                                                 }
739                                         } else {
740                                                 state.queueTheir[0].freq.Offset += uint64(len(buf))
741                                         }
742                                 } else {
743                                         state.Ctx.LogD("sp-file", sdsp, "queue disappeared")
744                                 }
745                                 state.Unlock()
746                         }
747                         state.Ctx.LogD("sp-xmit", SdsAdd(sds, SDS{"size": len(payload)}), "sending")
748                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
749                         if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
750                                 state.Ctx.LogE("sp-xmit", sds, err, "")
751                                 return
752                         }
753                 }
754         }()
755
756         // Receiver
757         state.wg.Add(1)
758         go func() {
759                 for {
760                         if state.NotAlive() {
761                                 break
762                         }
763                         state.Ctx.LogD("sp-recv", sds, "waiting for payload")
764                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
765                         payload, err := state.ReadSP(conn)
766                         if err != nil {
767                                 if err == io.EOF {
768                                         break
769                                 }
770                                 unmarshalErr := err.(*xdr.UnmarshalError)
771                                 netErr, ok := unmarshalErr.Err.(net.Error)
772                                 if ok && netErr.Timeout() {
773                                         continue
774                                 }
775                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
776                                         break
777                                 }
778                                 state.Ctx.LogE("sp-recv", sds, err, "")
779                                 break
780                         }
781                         state.Ctx.LogD(
782                                 "sp-recv",
783                                 SdsAdd(sds, SDS{"size": len(payload)}),
784                                 "got payload",
785                         )
786                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
787                         if err != nil {
788                                 state.Ctx.LogE("sp-recv", sds, err, "")
789                                 break
790                         }
791                         state.Ctx.LogD(
792                                 "sp-recv",
793                                 SdsAdd(sds, SDS{"size": len(payload)}),
794                                 "processing",
795                         )
796                         replies, err := state.ProcessSP(payload)
797                         if err != nil {
798                                 state.Ctx.LogE("sp-recv", sds, err, "")
799                                 break
800                         }
801                         state.wg.Add(1)
802                         go func() {
803                                 for _, reply := range replies {
804                                         state.Ctx.LogD(
805                                                 "sp-recv",
806                                                 SdsAdd(sds, SDS{"size": len(reply)}),
807                                                 "queuing reply",
808                                         )
809                                         state.payloads <- reply
810                                 }
811                                 state.wg.Done()
812                         }()
813                         if state.rxRate > 0 {
814                                 time.Sleep(time.Second / time.Duration(state.rxRate))
815                         }
816                 }
817                 state.SetDead()
818                 state.wg.Done()
819                 state.SetDead()
820                 conn.Close()
821         }()
822
823         return nil
824 }
825
826 func (state *SPState) Wait() {
827         state.wg.Wait()
828         close(state.payloads)
829         close(state.pings)
830         state.dirUnlock()
831         state.Duration = time.Now().Sub(state.started)
832         state.RxSpeed = state.RxBytes
833         state.TxSpeed = state.TxBytes
834         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
835         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
836         if rxDuration > 0 {
837                 state.RxSpeed = state.RxBytes / rxDuration
838         }
839         if txDuration > 0 {
840                 state.TxSpeed = state.TxBytes / txDuration
841         }
842 }
843
844 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
845         sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)}
846         r := bytes.NewReader(payload)
847         var err error
848         var replies [][]byte
849         var infosGot bool
850         for r.Len() > 0 {
851                 state.Ctx.LogD("sp-process", sds, "unmarshaling header")
852                 var head SPHead
853                 if _, err = xdr.Unmarshal(r, &head); err != nil {
854                         state.Ctx.LogE("sp-process", sds, err, "")
855                         return nil, err
856                 }
857                 switch head.Type {
858                 case SPTypeHalt:
859                         state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "halt"}), "")
860                         state.Lock()
861                         state.queueTheir = nil
862                         state.Unlock()
863                 case SPTypePing:
864                         state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "ping"}), "")
865                 case SPTypeInfo:
866                         infosGot = true
867                         sdsp := SdsAdd(sds, SDS{"type": "info"})
868                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
869                         var info SPInfo
870                         if _, err = xdr.Unmarshal(r, &info); err != nil {
871                                 state.Ctx.LogE("sp-process", sdsp, err, "")
872                                 return nil, err
873                         }
874                         sdsp = SdsAdd(sds, SDS{
875                                 "pkt":  ToBase32(info.Hash[:]),
876                                 "size": int64(info.Size),
877                                 "nice": int(info.Nice),
878                         })
879                         if !state.listOnly && info.Nice > state.Nice {
880                                 state.Ctx.LogD("sp-process", sdsp, "too nice")
881                                 continue
882                         }
883                         state.Ctx.LogD("sp-process", sdsp, "received")
884                         if !state.listOnly && state.xxOnly == TTx {
885                                 continue
886                         }
887                         state.Lock()
888                         state.infosTheir[*info.Hash] = &info
889                         state.Unlock()
890                         state.Ctx.LogD("sp-process", sdsp, "stating part")
891                         pktPath := filepath.Join(
892                                 state.Ctx.Spool,
893                                 state.Node.Id.String(),
894                                 string(TRx),
895                                 ToBase32(info.Hash[:]),
896                         )
897                         if _, err = os.Stat(pktPath); err == nil {
898                                 state.Ctx.LogI("sp-info", sdsp, "already done")
899                                 if !state.listOnly {
900                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
901                                 }
902                                 continue
903                         }
904                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
905                                 state.Ctx.LogI("sp-info", sdsp, "already seen")
906                                 if !state.listOnly {
907                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
908                                 }
909                                 continue
910                         }
911                         fi, err := os.Stat(pktPath + PartSuffix)
912                         var offset int64
913                         if err == nil {
914                                 offset = fi.Size()
915                         }
916                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
917                                 state.Ctx.LogI("sp-info", sdsp, "not enough space")
918                                 continue
919                         }
920                         state.Ctx.LogI(
921                                 "sp-info",
922                                 SdsAdd(sdsp, SDS{"offset": offset}),
923                                 "",
924                         )
925                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
926                                 replies = append(replies, MarshalSP(
927                                         SPTypeFreq,
928                                         SPFreq{info.Hash, uint64(offset)},
929                                 ))
930                         }
931                 case SPTypeFile:
932                         sdsp := SdsAdd(sds, SDS{"type": "file"})
933                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
934                         var file SPFile
935                         if _, err = xdr.Unmarshal(r, &file); err != nil {
936                                 state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "file"}), err, "")
937                                 return nil, err
938                         }
939                         sdsp["xx"] = string(TRx)
940                         sdsp["pkt"] = ToBase32(file.Hash[:])
941                         sdsp["size"] = len(file.Payload)
942                         dirToSync := filepath.Join(
943                                 state.Ctx.Spool,
944                                 state.Node.Id.String(),
945                                 string(TRx),
946                         )
947                         filePath := filepath.Join(dirToSync, ToBase32(file.Hash[:]))
948                         state.Ctx.LogD("sp-file", sdsp, "opening part")
949                         fd, err := os.OpenFile(
950                                 filePath+PartSuffix,
951                                 os.O_RDWR|os.O_CREATE,
952                                 os.FileMode(0666),
953                         )
954                         if err != nil {
955                                 state.Ctx.LogE("sp-file", sdsp, err, "")
956                                 return nil, err
957                         }
958                         state.Ctx.LogD(
959                                 "sp-file",
960                                 SdsAdd(sdsp, SDS{"offset": file.Offset}),
961                                 "seeking",
962                         )
963                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
964                                 state.Ctx.LogE("sp-file", sdsp, err, "")
965                                 fd.Close()
966                                 return nil, err
967                         }
968                         state.Ctx.LogD("sp-file", sdsp, "writing")
969                         _, err = fd.Write(file.Payload)
970                         if err != nil {
971                                 state.Ctx.LogE("sp-file", sdsp, err, "")
972                                 fd.Close()
973                                 return nil, err
974                         }
975                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
976                         sdsp["size"] = ourSize
977                         fullsize := int64(0)
978                         state.RLock()
979                         infoTheir, ok := state.infosTheir[*file.Hash]
980                         state.RUnlock()
981                         if ok {
982                                 fullsize = int64(infoTheir.Size)
983                         }
984                         sdsp["fullsize"] = fullsize
985                         if state.Ctx.ShowPrgrs {
986                                 Progress("Rx", sdsp)
987                         }
988                         if fullsize != ourSize {
989                                 fd.Close()
990                                 continue
991                         }
992                         spWorkersGroup.Wait()
993                         spWorkersGroup.Add(1)
994                         go func() {
995                                 if err := fd.Sync(); err != nil {
996                                         state.Ctx.LogE("sp-file", sdsp, err, "sync")
997                                         fd.Close()
998                                         return
999                                 }
1000                                 state.wg.Add(1)
1001                                 defer state.wg.Done()
1002                                 fd.Seek(0, io.SeekStart)
1003                                 state.Ctx.LogD("sp-file", sdsp, "checking")
1004                                 gut, err := Check(fd, file.Hash[:], sdsp, state.Ctx.ShowPrgrs)
1005                                 fd.Close()
1006                                 if err != nil || !gut {
1007                                         state.Ctx.LogE("sp-file", sdsp, errors.New("checksum mismatch"), "")
1008                                         return
1009                                 }
1010                                 state.Ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "")
1011                                 if err = os.Rename(filePath+PartSuffix, filePath); err != nil {
1012                                         state.Ctx.LogE("sp-file", sdsp, err, "rename")
1013                                         return
1014                                 }
1015                                 if err = DirSync(dirToSync); err != nil {
1016                                         state.Ctx.LogE("sp-file", sdsp, err, "sync")
1017                                         return
1018                                 }
1019                                 state.Lock()
1020                                 delete(state.infosTheir, *file.Hash)
1021                                 state.Unlock()
1022                                 spWorkersGroup.Done()
1023                                 state.wg.Add(1)
1024                                 go func() {
1025                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
1026                                         state.wg.Done()
1027                                 }()
1028                         }()
1029                 case SPTypeDone:
1030                         sdsp := SdsAdd(sds, SDS{"type": "done"})
1031                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
1032                         var done SPDone
1033                         if _, err = xdr.Unmarshal(r, &done); err != nil {
1034                                 state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "done"}), err, "")
1035                                 return nil, err
1036                         }
1037                         sdsp["pkt"] = ToBase32(done.Hash[:])
1038                         state.Ctx.LogD("sp-done", sdsp, "removing")
1039                         err := os.Remove(filepath.Join(
1040                                 state.Ctx.Spool,
1041                                 state.Node.Id.String(),
1042                                 string(TTx),
1043                                 ToBase32(done.Hash[:]),
1044                         ))
1045                         sdsp["xx"] = string(TTx)
1046                         if err == nil {
1047                                 state.Ctx.LogI("sp-done", sdsp, "")
1048                         } else {
1049                                 state.Ctx.LogE("sp-done", sdsp, err, "")
1050                         }
1051                 case SPTypeFreq:
1052                         sdsp := SdsAdd(sds, SDS{"type": "freq"})
1053                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
1054                         var freq SPFreq
1055                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
1056                                 state.Ctx.LogE("sp-process", sdsp, err, "")
1057                                 return nil, err
1058                         }
1059                         sdsp["pkt"] = ToBase32(freq.Hash[:])
1060                         sdsp["offset"] = freq.Offset
1061                         state.Ctx.LogD("sp-process", sdsp, "queueing")
1062                         nice, exists := state.infosOurSeen[*freq.Hash]
1063                         if exists {
1064                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1065                                         state.Lock()
1066                                         insertIdx := 0
1067                                         var freqWithNice *FreqWithNice
1068                                         for insertIdx, freqWithNice = range state.queueTheir {
1069                                                 if freqWithNice.nice > nice {
1070                                                         break
1071                                                 }
1072                                         }
1073                                         state.queueTheir = append(state.queueTheir, nil)
1074                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1075                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1076                                         state.Unlock()
1077                                 } else {
1078                                         state.Ctx.LogD("sp-process", sdsp, "skipping")
1079                                 }
1080                         } else {
1081                                 state.Ctx.LogD("sp-process", sdsp, "unknown")
1082                         }
1083                 default:
1084                         state.Ctx.LogE(
1085                                 "sp-process",
1086                                 SdsAdd(sds, SDS{"type": head.Type}),
1087                                 errors.New("unknown type"),
1088                                 "",
1089                         )
1090                         return nil, BadPktType
1091                 }
1092                 if head.Type != SPTypePing {
1093                         state.RxLastNonPing = state.RxLastSeen
1094                 }
1095         }
1096         if infosGot {
1097                 var pkts int
1098                 var size uint64
1099                 state.RLock()
1100                 for _, info := range state.infosTheir {
1101                         pkts++
1102                         size += info.Size
1103                 }
1104                 state.RUnlock()
1105                 state.Ctx.LogI("sp-infos", SDS{
1106                         "xx":   string(TRx),
1107                         "node": state.Node.Id,
1108                         "pkts": pkts,
1109                         "size": int64(size),
1110                 }, "")
1111         }
1112         return payloadsSplit(replies), nil
1113 }