]> Cypherpunks.ru repositories - nncp.git/blob - src/cypherpunks.ru/nncp/sp.go
Simple rate limiter
[nncp.git] / src / cypherpunks.ru / nncp / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2018 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 package nncp
20
21 import (
22         "bytes"
23         "crypto/subtle"
24         "errors"
25         "io"
26         "net"
27         "os"
28         "path/filepath"
29         "sort"
30         "strconv"
31         "sync"
32         "time"
33
34         "github.com/davecgh/go-xdr/xdr2"
35         "github.com/flynn/noise"
36 )
37
38 const (
39         MaxSPSize       = 1<<16 - 256
40         PartSuffix      = ".part"
41         DefaultDeadline = 10
42 )
43
44 var (
45         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
46
47         SPHeadOverhead    int
48         SPInfoOverhead    int
49         SPFreqOverhead    int
50         SPFileOverhead    int
51         SPHaltMarshalized []byte
52
53         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
54                 noise.DH25519,
55                 noise.CipherChaChaPoly,
56                 noise.HashBLAKE2b,
57         )
58
59         spWorkersGroup sync.WaitGroup
60 )
61
62 type SPType uint8
63
64 const (
65         SPTypeInfo SPType = iota
66         SPTypeFreq SPType = iota
67         SPTypeFile SPType = iota
68         SPTypeDone SPType = iota
69         SPTypeHalt SPType = iota
70 )
71
72 type SPHead struct {
73         Type SPType
74 }
75
76 type SPInfo struct {
77         Nice uint8
78         Size uint64
79         Hash *[32]byte
80 }
81
82 type SPFreq struct {
83         Hash   *[32]byte
84         Offset uint64
85 }
86
87 type SPFile struct {
88         Hash    *[32]byte
89         Offset  uint64
90         Payload []byte
91 }
92
93 type SPDone struct {
94         Hash *[32]byte
95 }
96
97 type SPRaw struct {
98         Magic   [8]byte
99         Payload []byte
100 }
101
102 type FreqWithNice struct {
103         freq *SPFreq
104         nice uint8
105 }
106
107 func init() {
108         var buf bytes.Buffer
109         spHead := SPHead{Type: SPTypeHalt}
110         if _, err := xdr.Marshal(&buf, spHead); err != nil {
111                 panic(err)
112         }
113         copy(SPHaltMarshalized, buf.Bytes())
114         SPHeadOverhead = buf.Len()
115         buf.Reset()
116
117         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
118         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
119                 panic(err)
120         }
121         SPInfoOverhead = buf.Len()
122         buf.Reset()
123
124         spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
125         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
126                 panic(err)
127         }
128         SPFreqOverhead = buf.Len()
129         buf.Reset()
130
131         spFile := SPFile{Hash: new([32]byte), Offset: 123}
132         if _, err := xdr.Marshal(&buf, spFile); err != nil {
133                 panic(err)
134         }
135         SPFileOverhead = buf.Len()
136 }
137
138 func MarshalSP(typ SPType, sp interface{}) []byte {
139         var buf bytes.Buffer
140         var err error
141         if _, err = xdr.Marshal(&buf, SPHead{typ}); err != nil {
142                 panic(err)
143         }
144         if _, err = xdr.Marshal(&buf, sp); err != nil {
145                 panic(err)
146         }
147         return buf.Bytes()
148 }
149
150 func payloadsSplit(payloads [][]byte) [][]byte {
151         var outbounds [][]byte
152         outbound := make([]byte, 0, MaxSPSize)
153         for i, payload := range payloads {
154                 outbound = append(outbound, payload...)
155                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
156                         outbounds = append(outbounds, outbound)
157                         outbound = make([]byte, 0, MaxSPSize)
158                 }
159         }
160         if len(outbound) > 0 {
161                 outbounds = append(outbounds, outbound)
162         }
163         return outbounds
164 }
165
166 type SPState struct {
167         ctx            *Ctx
168         Node           *Node
169         onlineDeadline uint
170         maxOnlineTime  uint
171         nice           uint8
172         hs             *noise.HandshakeState
173         csOur          *noise.CipherState
174         csTheir        *noise.CipherState
175         payloads       chan []byte
176         infosTheir     map[[32]byte]*SPInfo
177         infosOurSeen   map[[32]byte]uint8
178         queueTheir     []*FreqWithNice
179         wg             sync.WaitGroup
180         RxBytes        int64
181         RxLastSeen     time.Time
182         TxBytes        int64
183         TxLastSeen     time.Time
184         started        time.Time
185         Duration       time.Duration
186         RxSpeed        int64
187         TxSpeed        int64
188         rxLock         *os.File
189         txLock         *os.File
190         xxOnly         TRxTx
191         rxRate         int
192         txRate         int
193         isDead         bool
194         sync.RWMutex
195 }
196
197 func (state *SPState) NotAlive() bool {
198         if state.isDead {
199                 return true
200         }
201         now := time.Now()
202         if state.maxOnlineTime > 0 && state.started.Add(time.Duration(state.maxOnlineTime)*time.Second).Before(now) {
203                 return true
204         }
205         return uint(now.Sub(state.RxLastSeen).Seconds()) >= state.onlineDeadline && uint(now.Sub(state.TxLastSeen).Seconds()) >= state.onlineDeadline
206 }
207
208 func (state *SPState) dirUnlock() {
209         state.ctx.UnlockDir(state.rxLock)
210         state.ctx.UnlockDir(state.txLock)
211 }
212
213 func (state *SPState) WriteSP(dst io.Writer, payload []byte) error {
214         n, err := xdr.Marshal(dst, SPRaw{Magic: MagicNNCPLv1, Payload: payload})
215         if err == nil {
216                 state.TxLastSeen = time.Now()
217                 state.TxBytes += int64(n)
218         }
219         return err
220 }
221
222 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
223         var sp SPRaw
224         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
225         if err != nil {
226                 return nil, err
227         }
228         state.RxLastSeen = time.Now()
229         state.RxBytes += int64(n)
230         if sp.Magic != MagicNNCPLv1 {
231                 return nil, BadMagic
232         }
233         return sp.Payload, nil
234 }
235
236 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
237         var infos []*SPInfo
238         var totalSize int64
239         for job := range ctx.Jobs(nodeId, TTx) {
240                 job.Fd.Close()
241                 if job.PktEnc.Nice > nice {
242                         continue
243                 }
244                 if _, known := (*seen)[*job.HshValue]; known {
245                         continue
246                 }
247                 totalSize += job.Size
248                 infos = append(infos, &SPInfo{
249                         Nice: job.PktEnc.Nice,
250                         Size: uint64(job.Size),
251                         Hash: job.HshValue,
252                 })
253                 (*seen)[*job.HshValue] = job.PktEnc.Nice
254         }
255         sort.Sort(ByNice(infos))
256         var payloads [][]byte
257         for _, info := range infos {
258                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
259                 ctx.LogD("sp-info-our", SDS{
260                         "node": nodeId,
261                         "name": ToBase32(info.Hash[:]),
262                         "size": strconv.FormatInt(int64(info.Size), 10),
263                 }, "")
264         }
265         if totalSize > 0 {
266                 ctx.LogI("sp-infos", SDS{
267                         "xx":   string(TTx),
268                         "node": nodeId,
269                         "pkts": strconv.Itoa(len(payloads)),
270                         "size": strconv.FormatInt(totalSize, 10),
271                 }, "")
272         }
273         return payloadsSplit(payloads)
274 }
275
276 func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly TRxTx, rxRate, txRate int, onlineDeadline, maxOnlineTime uint) (*SPState, error) {
277         err := ctx.ensureRxDir(nodeId)
278         if err != nil {
279                 return nil, err
280         }
281         var rxLock *os.File
282         if xxOnly == "" || xxOnly == TRx {
283                 rxLock, err = ctx.LockDir(nodeId, TRx)
284                 if err != nil {
285                         return nil, err
286                 }
287         }
288         var txLock *os.File
289         if xxOnly == "" || xxOnly == TTx {
290                 txLock, err = ctx.LockDir(nodeId, TTx)
291                 if err != nil {
292                         return nil, err
293                 }
294         }
295         started := time.Now()
296         node := ctx.Neigh[*nodeId]
297         conf := noise.Config{
298                 CipherSuite: NoiseCipherSuite,
299                 Pattern:     noise.HandshakeIK,
300                 Initiator:   true,
301                 StaticKeypair: noise.DHKey{
302                         Private: ctx.Self.NoisePrv[:],
303                         Public:  ctx.Self.NoisePub[:],
304                 },
305                 PeerStatic: node.NoisePub[:],
306         }
307         hs, err := noise.NewHandshakeState(conf)
308         if err != nil {
309                 return nil, err
310         }
311         state := SPState{
312                 ctx:            ctx,
313                 hs:             hs,
314                 Node:           node,
315                 onlineDeadline: onlineDeadline,
316                 maxOnlineTime:  maxOnlineTime,
317                 nice:           nice,
318                 payloads:       make(chan []byte),
319                 infosTheir:     make(map[[32]byte]*SPInfo),
320                 infosOurSeen:   make(map[[32]byte]uint8),
321                 started:        started,
322                 rxLock:         rxLock,
323                 txLock:         txLock,
324                 xxOnly:         xxOnly,
325                 rxRate:         rxRate,
326                 txRate:         txRate,
327         }
328
329         var infosPayloads [][]byte
330         if xxOnly == "" || xxOnly == TTx {
331                 infosPayloads = ctx.infosOur(nodeId, nice, &state.infosOurSeen)
332         }
333         var firstPayload []byte
334         if len(infosPayloads) > 0 {
335                 firstPayload = infosPayloads[0]
336         }
337         // Pad first payload, to hide actual number of existing files
338         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
339                 firstPayload = append(firstPayload, SPHaltMarshalized...)
340         }
341
342         var buf []byte
343         var payload []byte
344         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
345         if err != nil {
346                 state.dirUnlock()
347                 return nil, err
348         }
349         sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))}
350         ctx.LogD("sp-start", sds, "sending first message")
351         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
352         if err = state.WriteSP(conn, buf); err != nil {
353                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
354                 state.dirUnlock()
355                 return nil, err
356         }
357         ctx.LogD("sp-start", sds, "waiting for first message")
358         conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
359         if buf, err = state.ReadSP(conn); err != nil {
360                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
361                 state.dirUnlock()
362                 return nil, err
363         }
364         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
365         if err != nil {
366                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
367                 state.dirUnlock()
368                 return nil, err
369         }
370         ctx.LogD("sp-start", sds, "starting workers")
371         err = state.StartWorkers(conn, infosPayloads, payload)
372         if err != nil {
373                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
374                 state.dirUnlock()
375                 return nil, err
376         }
377         return &state, err
378 }
379
380 func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly TRxTx) (*SPState, error) {
381         started := time.Now()
382         conf := noise.Config{
383                 CipherSuite: NoiseCipherSuite,
384                 Pattern:     noise.HandshakeIK,
385                 Initiator:   false,
386                 StaticKeypair: noise.DHKey{
387                         Private: ctx.Self.NoisePrv[:],
388                         Public:  ctx.Self.NoisePub[:],
389                 },
390         }
391         hs, err := noise.NewHandshakeState(conf)
392         if err != nil {
393                 return nil, err
394         }
395         state := SPState{
396                 ctx:          ctx,
397                 hs:           hs,
398                 nice:         nice,
399                 payloads:     make(chan []byte),
400                 infosOurSeen: make(map[[32]byte]uint8),
401                 infosTheir:   make(map[[32]byte]*SPInfo),
402                 started:      started,
403                 xxOnly:       xxOnly,
404         }
405         var buf []byte
406         var payload []byte
407         ctx.LogD(
408                 "sp-start",
409                 SDS{"nice": strconv.Itoa(int(nice))},
410                 "waiting for first message",
411         )
412         conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
413         if buf, err = state.ReadSP(conn); err != nil {
414                 ctx.LogE("sp-start", SDS{"err": err}, "")
415                 return nil, err
416         }
417         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
418                 ctx.LogE("sp-start", SDS{"err": err}, "")
419                 return nil, err
420         }
421
422         var node *Node
423         for _, node = range ctx.Neigh {
424                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), node.NoisePub[:]) == 1 {
425                         break
426                 }
427         }
428         if node == nil {
429                 peerId := ToBase32(state.hs.PeerStatic())
430                 ctx.LogE("sp-start", SDS{"peer": peerId}, "unknown")
431                 return nil, errors.New("Unknown peer: " + peerId)
432         }
433         state.Node = node
434         state.rxRate = node.RxRate
435         state.txRate = node.TxRate
436         state.onlineDeadline = node.OnlineDeadline
437         state.maxOnlineTime = node.MaxOnlineTime
438         sds := SDS{"node": node.Id, "nice": strconv.Itoa(int(nice))}
439
440         if ctx.ensureRxDir(node.Id); err != nil {
441                 return nil, err
442         }
443         var rxLock *os.File
444         if xxOnly == "" || xxOnly == TRx {
445                 rxLock, err = ctx.LockDir(node.Id, TRx)
446                 if err != nil {
447                         return nil, err
448                 }
449         }
450         state.rxLock = rxLock
451         var txLock *os.File
452         if xxOnly == "" || xxOnly == TTx {
453                 txLock, err = ctx.LockDir(node.Id, TTx)
454                 if err != nil {
455                         return nil, err
456                 }
457         }
458         state.txLock = txLock
459
460         var infosPayloads [][]byte
461         if xxOnly == "" || xxOnly == TTx {
462                 infosPayloads = ctx.infosOur(node.Id, nice, &state.infosOurSeen)
463         }
464         var firstPayload []byte
465         if len(infosPayloads) > 0 {
466                 firstPayload = infosPayloads[0]
467         }
468         // Pad first payload, to hide actual number of existing files
469         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
470                 firstPayload = append(firstPayload, SPHaltMarshalized...)
471         }
472
473         ctx.LogD("sp-start", sds, "sending first message")
474         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
475         if err != nil {
476                 state.dirUnlock()
477                 return nil, err
478         }
479         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
480         if err = state.WriteSP(conn, buf); err != nil {
481                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
482                 state.dirUnlock()
483                 return nil, err
484         }
485         ctx.LogD("sp-start", sds, "starting workers")
486         err = state.StartWorkers(conn, infosPayloads, payload)
487         if err != nil {
488                 state.dirUnlock()
489                 return nil, err
490         }
491         return &state, err
492 }
493
494 func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payload []byte) error {
495         sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.nice))}
496         if len(infosPayloads) > 1 {
497                 go func() {
498                         for _, payload := range infosPayloads[1:] {
499                                 state.ctx.LogD(
500                                         "sp-work",
501                                         SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
502                                         "queuing remaining payload",
503                                 )
504                                 state.payloads <- payload
505                         }
506                 }()
507         }
508         state.ctx.LogD(
509                 "sp-work",
510                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
511                 "processing first payload",
512         )
513         replies, err := state.ProcessSP(payload)
514         if err != nil {
515                 state.ctx.LogE("sp-work", SdsAdd(sds, SDS{"err": err}), "")
516                 return err
517         }
518
519         go func() {
520                 for _, reply := range replies {
521                         state.ctx.LogD(
522                                 "sp-work",
523                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}),
524                                 "queuing reply",
525                         )
526                         state.payloads <- reply
527                 }
528         }()
529
530         if state.xxOnly == "" || state.xxOnly == TTx {
531                 go func() {
532                         for range time.Tick(time.Second) {
533                                 for _, payload := range state.ctx.infosOur(
534                                         state.Node.Id,
535                                         state.nice,
536                                         &state.infosOurSeen,
537                                 ) {
538                                         state.ctx.LogD(
539                                                 "sp-work",
540                                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
541                                                 "queuing new info",
542                                         )
543                                         state.payloads <- payload
544                                 }
545                         }
546                 }()
547         }
548
549         state.wg.Add(1)
550         go func() {
551                 defer func() {
552                         state.isDead = true
553                         state.wg.Done()
554                 }()
555                 for {
556                         if state.NotAlive() {
557                                 return
558                         }
559                         var payload []byte
560                         select {
561                         case payload = <-state.payloads:
562                                 state.ctx.LogD(
563                                         "sp-xmit",
564                                         SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
565                                         "got payload",
566                                 )
567                         default:
568                         }
569                         if payload == nil {
570                                 state.RLock()
571                                 if len(state.queueTheir) == 0 {
572                                         state.ctx.LogD("sp-xmit", sds, "file queue is empty")
573                                         state.RUnlock()
574                                         time.Sleep(100 * time.Millisecond)
575                                         continue
576                                 }
577                                 freq := state.queueTheir[0].freq
578                                 state.RUnlock()
579
580                                 if state.txRate > 0 {
581                                         time.Sleep(time.Second / time.Duration(state.txRate))
582                                 }
583
584                                 sdsp := SdsAdd(sds, SDS{
585                                         "xx":   string(TTx),
586                                         "hash": ToBase32(freq.Hash[:]),
587                                         "size": strconv.FormatInt(int64(freq.Offset), 10),
588                                 })
589                                 state.ctx.LogD("sp-file", sdsp, "queueing")
590                                 fd, err := os.Open(filepath.Join(
591                                         state.ctx.Spool,
592                                         state.Node.Id.String(),
593                                         string(TTx),
594                                         ToBase32(freq.Hash[:]),
595                                 ))
596                                 if err != nil {
597                                         state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
598                                         break
599                                 }
600                                 fi, err := fd.Stat()
601                                 if err != nil {
602                                         state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
603                                         break
604                                 }
605                                 fullSize := uint64(fi.Size())
606                                 var buf []byte
607                                 if freq.Offset < fullSize {
608                                         state.ctx.LogD("sp-file", sdsp, "seeking")
609                                         if _, err = fd.Seek(int64(freq.Offset), 0); err != nil {
610                                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
611                                                 break
612                                         }
613                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
614                                         n, err := fd.Read(buf)
615                                         if err != nil {
616                                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
617                                                 break
618                                         }
619                                         buf = buf[:n]
620                                         state.ctx.LogD(
621                                                 "sp-file",
622                                                 SdsAdd(sdsp, SDS{"size": strconv.Itoa(n)}),
623                                                 "read",
624                                         )
625                                 }
626                                 fd.Close()
627                                 payload = MarshalSP(SPTypeFile, SPFile{
628                                         Hash:    freq.Hash,
629                                         Offset:  freq.Offset,
630                                         Payload: buf,
631                                 })
632                                 ourSize := freq.Offset + uint64(len(buf))
633                                 sdsp["size"] = strconv.FormatInt(int64(ourSize), 10)
634                                 sdsp["fullsize"] = strconv.FormatInt(int64(fullSize), 10)
635                                 state.ctx.LogP("sp-file", sdsp, "")
636                                 state.Lock()
637                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
638                                         if ourSize == fullSize {
639                                                 state.ctx.LogD("sp-file", sdsp, "finished")
640                                                 if len(state.queueTheir) > 1 {
641                                                         state.queueTheir = state.queueTheir[1:]
642                                                 } else {
643                                                         state.queueTheir = state.queueTheir[:0]
644                                                 }
645                                         } else {
646                                                 state.queueTheir[0].freq.Offset += uint64(len(buf))
647                                         }
648                                 } else {
649                                         state.ctx.LogD("sp-file", sdsp, "queue disappeared")
650                                 }
651                                 state.Unlock()
652                         }
653                         state.ctx.LogD(
654                                 "sp-xmit",
655                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
656                                 "sending",
657                         )
658                         conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
659                         if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil {
660                                 state.ctx.LogE("sp-xmit", SdsAdd(sds, SDS{"err": err}), "")
661                                 break
662                         }
663                 }
664         }()
665
666         state.wg.Add(1)
667         go func() {
668                 defer func() {
669                         state.isDead = true
670                         state.wg.Done()
671                 }()
672                 for {
673                         if state.NotAlive() {
674                                 return
675                         }
676                         state.ctx.LogD("sp-recv", sds, "waiting for payload")
677                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
678                         payload, err := state.ReadSP(conn)
679                         if err != nil {
680                                 unmarshalErr := err.(*xdr.UnmarshalError)
681                                 netErr, ok := unmarshalErr.Err.(net.Error)
682                                 if ok && netErr.Timeout() {
683                                         continue
684                                 }
685                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
686                                         break
687                                 }
688                                 state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
689                                 break
690                         }
691                         state.ctx.LogD(
692                                 "sp-recv",
693                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
694                                 "got payload",
695                         )
696                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
697                         if err != nil {
698                                 state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
699                                 break
700                         }
701                         state.ctx.LogD(
702                                 "sp-recv",
703                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
704                                 "processing",
705                         )
706                         replies, err := state.ProcessSP(payload)
707                         if err != nil {
708                                 state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
709                                 break
710                         }
711                         go func() {
712                                 for _, reply := range replies {
713                                         state.ctx.LogD(
714                                                 "sp-recv",
715                                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}),
716                                                 "queuing reply",
717                                         )
718                                         state.payloads <- reply
719                                 }
720                         }()
721                         if state.rxRate > 0 {
722                                 time.Sleep(time.Second / time.Duration(state.rxRate))
723                         }
724                 }
725         }()
726
727         return nil
728 }
729
730 func (state *SPState) Wait() {
731         state.wg.Wait()
732         state.dirUnlock()
733         state.Duration = time.Now().Sub(state.started)
734         state.RxSpeed = state.RxBytes
735         state.TxSpeed = state.TxBytes
736         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
737         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
738         if rxDuration > 0 {
739                 state.RxSpeed = state.RxBytes / rxDuration
740         }
741         if txDuration > 0 {
742                 state.TxSpeed = state.TxBytes / txDuration
743         }
744 }
745
746 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
747         sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.nice))}
748         r := bytes.NewReader(payload)
749         var err error
750         var replies [][]byte
751         var infosGot bool
752         for r.Len() > 0 {
753                 state.ctx.LogD("sp-process", sds, "unmarshaling header")
754                 var head SPHead
755                 if _, err = xdr.Unmarshal(r, &head); err != nil {
756                         state.ctx.LogE("sp-process", SdsAdd(sds, SDS{"err": err}), "")
757                         return nil, err
758                 }
759                 switch head.Type {
760                 case SPTypeInfo:
761                         infosGot = true
762                         sdsp := SdsAdd(sds, SDS{"type": "info"})
763                         state.ctx.LogD("sp-process", sdsp, "unmarshaling packet")
764                         var info SPInfo
765                         if _, err = xdr.Unmarshal(r, &info); err != nil {
766                                 state.ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "")
767                                 return nil, err
768                         }
769                         sdsp = SdsAdd(sds, SDS{
770                                 "hash": ToBase32(info.Hash[:]),
771                                 "size": strconv.FormatInt(int64(info.Size), 10),
772                         })
773                         if info.Nice > state.nice {
774                                 state.ctx.LogD("sp-process", sdsp, "too nice")
775                                 continue
776                         }
777                         state.ctx.LogD("sp-process", sdsp, "received")
778                         if state.xxOnly == TTx {
779                                 continue
780                         }
781                         state.Lock()
782                         state.infosTheir[*info.Hash] = &info
783                         state.Unlock()
784                         state.ctx.LogD("sp-process", sdsp, "stating part")
785                         pktPath := filepath.Join(
786                                 state.ctx.Spool,
787                                 state.Node.Id.String(),
788                                 string(TRx),
789                                 ToBase32(info.Hash[:]),
790                         )
791                         if _, err = os.Stat(pktPath); err == nil {
792                                 state.ctx.LogD("sp-process", sdsp, "already done")
793                                 replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
794                                 continue
795                         }
796                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
797                                 state.ctx.LogD("sp-process", sdsp, "already seen")
798                                 replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
799                                 continue
800                         }
801                         fi, err := os.Stat(pktPath + PartSuffix)
802                         var offset int64
803                         if err == nil {
804                                 offset = fi.Size()
805                                 state.ctx.LogD(
806                                         "sp-process",
807                                         SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(offset, 10)}),
808                                         "part exists",
809                                 )
810                         }
811                         replies = append(replies, MarshalSP(
812                                 SPTypeFreq,
813                                 SPFreq{info.Hash, uint64(offset)},
814                         ))
815                 case SPTypeFile:
816                         state.ctx.LogD(
817                                 "sp-process",
818                                 SdsAdd(sds, SDS{"type": "file"}),
819                                 "unmarshaling packet",
820                         )
821                         var file SPFile
822                         if _, err = xdr.Unmarshal(r, &file); err != nil {
823                                 state.ctx.LogE("sp-process", SdsAdd(sds, SDS{
824                                         "err":  err,
825                                         "type": "file",
826                                 }), "")
827                                 return nil, err
828                         }
829                         sdsp := SdsAdd(sds, SDS{
830                                 "xx":   string(TRx),
831                                 "hash": ToBase32(file.Hash[:]),
832                                 "size": strconv.Itoa(len(file.Payload)),
833                         })
834                         filePath := filepath.Join(
835                                 state.ctx.Spool,
836                                 state.Node.Id.String(),
837                                 string(TRx),
838                                 ToBase32(file.Hash[:]),
839                         )
840                         state.ctx.LogD("sp-file", sdsp, "opening part")
841                         fd, err := os.OpenFile(
842                                 filePath+PartSuffix,
843                                 os.O_RDWR|os.O_CREATE,
844                                 os.FileMode(0600),
845                         )
846                         if err != nil {
847                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
848                                 return nil, err
849                         }
850                         state.ctx.LogD(
851                                 "sp-file",
852                                 SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(int64(file.Offset), 10)}),
853                                 "seeking",
854                         )
855                         if _, err = fd.Seek(int64(file.Offset), 0); err != nil {
856                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
857                                 fd.Close()
858                                 return nil, err
859                         }
860                         state.ctx.LogD("sp-file", sdsp, "writing")
861                         _, err = fd.Write(file.Payload)
862                         if err != nil {
863                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
864                                 fd.Close()
865                                 return nil, err
866                         }
867                         ourSize := uint64(file.Offset) + uint64(len(file.Payload))
868                         state.RLock()
869                         sdsp["fullsize"] = strconv.FormatInt(int64(state.infosTheir[*file.Hash].Size), 10)
870                         sdsp["size"] = strconv.FormatInt(int64(ourSize), 10)
871                         state.ctx.LogP("sp-file", sdsp, "")
872                         if state.infosTheir[*file.Hash].Size != ourSize {
873                                 state.RUnlock()
874                                 fd.Close()
875                                 continue
876                         }
877                         state.RUnlock()
878                         spWorkersGroup.Wait()
879                         spWorkersGroup.Add(1)
880                         go func() {
881                                 if err := fd.Sync(); err != nil {
882                                         state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "sync")
883                                         fd.Close()
884                                         return
885                                 }
886                                 state.wg.Add(1)
887                                 defer state.wg.Done()
888                                 fd.Seek(0, 0)
889                                 state.ctx.LogD("sp-file", sdsp, "checking")
890                                 gut, err := Check(fd, file.Hash[:])
891                                 fd.Close()
892                                 if err != nil || !gut {
893                                         state.ctx.LogE("sp-file", sdsp, "checksum mismatch")
894                                         return
895                                 }
896                                 state.ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "")
897                                 os.Rename(filePath+PartSuffix, filePath)
898                                 state.Lock()
899                                 delete(state.infosTheir, *file.Hash)
900                                 state.Unlock()
901                                 spWorkersGroup.Done()
902                                 go func() {
903                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
904                                 }()
905                         }()
906                 case SPTypeDone:
907                         state.ctx.LogD(
908                                 "sp-process",
909                                 SdsAdd(sds, SDS{"type": "done"}),
910                                 "unmarshaling packet",
911                         )
912                         var done SPDone
913                         if _, err = xdr.Unmarshal(r, &done); err != nil {
914                                 state.ctx.LogE("sp-process", SdsAdd(sds, SDS{
915                                         "type": "done",
916                                         "err":  err,
917                                 }), "")
918                                 return nil, err
919                         }
920                         sdsp := SdsAdd(sds, SDS{"hash": ToBase32(done.Hash[:])})
921                         state.ctx.LogD("sp-done", sdsp, "removing")
922                         err := os.Remove(filepath.Join(
923                                 state.ctx.Spool,
924                                 state.Node.Id.String(),
925                                 string(TTx),
926                                 ToBase32(done.Hash[:]),
927                         ))
928                         if err == nil {
929                                 state.ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TTx)}), "")
930                         } else {
931                                 state.ctx.LogE("sp-done", SdsAdd(sdsp, SDS{"xx": string(TTx)}), "")
932                         }
933                 case SPTypeFreq:
934                         sdsp := SdsAdd(sds, SDS{"type": "freq"})
935                         state.ctx.LogD("sp-process", sdsp, "unmarshaling packet")
936                         var freq SPFreq
937                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
938                                 state.ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "")
939                                 return nil, err
940                         }
941                         state.ctx.LogD("sp-process", SdsAdd(sdsp, SDS{
942                                 "hash":   ToBase32(freq.Hash[:]),
943                                 "offset": strconv.FormatInt(int64(freq.Offset), 10),
944                         }), "queueing")
945                         nice, exists := state.infosOurSeen[*freq.Hash]
946                         if exists {
947                                 state.Lock()
948                                 insertIdx := 0
949                                 var freqWithNice *FreqWithNice
950                                 for insertIdx, freqWithNice = range state.queueTheir {
951                                         if freqWithNice.nice > nice {
952                                                 break
953                                         }
954                                 }
955                                 state.queueTheir = append(state.queueTheir, nil)
956                                 copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
957                                 state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
958                                 state.Unlock()
959                         } else {
960                                 state.ctx.LogD("sp-process", SdsAdd(sdsp, SDS{
961                                         "hash":   ToBase32(freq.Hash[:]),
962                                         "offset": strconv.FormatInt(int64(freq.Offset), 10),
963                                 }), "unknown")
964                         }
965                 case SPTypeHalt:
966                         sdsp := SdsAdd(sds, SDS{"type": "halt"})
967                         state.ctx.LogD("sp-process", sdsp, "")
968                         state.Lock()
969                         state.queueTheir = nil
970                         state.Unlock()
971                 default:
972                         state.ctx.LogE(
973                                 "sp-process",
974                                 SdsAdd(sds, SDS{"type": head.Type}),
975                                 "unknown",
976                         )
977                         return nil, BadPktType
978                 }
979         }
980         if infosGot {
981                 var pkts int
982                 var size uint64
983                 state.RLock()
984                 for _, info := range state.infosTheir {
985                         pkts++
986                         size += info.Size
987                 }
988                 state.RUnlock()
989                 state.ctx.LogI("sp-infos", SDS{
990                         "xx":   string(TRx),
991                         "node": state.Node.Id,
992                         "pkts": strconv.Itoa(pkts),
993                         "size": strconv.FormatInt(int64(size), 10),
994                 }, "")
995         }
996         return payloadsSplit(replies), nil
997 }