]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
Fix error type assertion
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2019 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "io"
25         "net"
26         "os"
27         "path/filepath"
28         "sort"
29         "sync"
30         "time"
31
32         xdr "github.com/davecgh/go-xdr/xdr2"
33         "github.com/flynn/noise"
34 )
35
36 const (
37         MaxSPSize       = 1<<16 - 256
38         PartSuffix      = ".part"
39         DefaultDeadline = 10
40 )
41
42 var (
43         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
44
45         SPHeadOverhead    int
46         SPInfoOverhead    int
47         SPFreqOverhead    int
48         SPFileOverhead    int
49         SPHaltMarshalized []byte
50
51         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
52                 noise.DH25519,
53                 noise.CipherChaChaPoly,
54                 noise.HashBLAKE2b,
55         )
56
57         spWorkersGroup sync.WaitGroup
58 )
59
60 type SPType uint8
61
62 const (
63         SPTypeInfo SPType = iota
64         SPTypeFreq SPType = iota
65         SPTypeFile SPType = iota
66         SPTypeDone SPType = iota
67         SPTypeHalt SPType = iota
68 )
69
70 type SPHead struct {
71         Type SPType
72 }
73
74 type SPInfo struct {
75         Nice uint8
76         Size uint64
77         Hash *[32]byte
78 }
79
80 type SPFreq struct {
81         Hash   *[32]byte
82         Offset uint64
83 }
84
85 type SPFile struct {
86         Hash    *[32]byte
87         Offset  uint64
88         Payload []byte
89 }
90
91 type SPDone struct {
92         Hash *[32]byte
93 }
94
95 type SPRaw struct {
96         Magic   [8]byte
97         Payload []byte
98 }
99
100 type FreqWithNice struct {
101         freq *SPFreq
102         nice uint8
103 }
104
105 type ConnDeadlined interface {
106         io.ReadWriteCloser
107         SetReadDeadline(t time.Time) error
108         SetWriteDeadline(t time.Time) error
109 }
110
111 func init() {
112         var buf bytes.Buffer
113         spHead := SPHead{Type: SPTypeHalt}
114         if _, err := xdr.Marshal(&buf, spHead); err != nil {
115                 panic(err)
116         }
117         copy(SPHaltMarshalized, buf.Bytes())
118         SPHeadOverhead = buf.Len()
119         buf.Reset()
120
121         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
122         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
123                 panic(err)
124         }
125         SPInfoOverhead = buf.Len()
126         buf.Reset()
127
128         spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
129         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
130                 panic(err)
131         }
132         SPFreqOverhead = buf.Len()
133         buf.Reset()
134
135         spFile := SPFile{Hash: new([32]byte), Offset: 123}
136         if _, err := xdr.Marshal(&buf, spFile); err != nil {
137                 panic(err)
138         }
139         SPFileOverhead = buf.Len()
140 }
141
142 func MarshalSP(typ SPType, sp interface{}) []byte {
143         var buf bytes.Buffer
144         var err error
145         if _, err = xdr.Marshal(&buf, SPHead{typ}); err != nil {
146                 panic(err)
147         }
148         if _, err = xdr.Marshal(&buf, sp); err != nil {
149                 panic(err)
150         }
151         return buf.Bytes()
152 }
153
154 func payloadsSplit(payloads [][]byte) [][]byte {
155         var outbounds [][]byte
156         outbound := make([]byte, 0, MaxSPSize)
157         for i, payload := range payloads {
158                 outbound = append(outbound, payload...)
159                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
160                         outbounds = append(outbounds, outbound)
161                         outbound = make([]byte, 0, MaxSPSize)
162                 }
163         }
164         if len(outbound) > 0 {
165                 outbounds = append(outbounds, outbound)
166         }
167         return outbounds
168 }
169
170 type SPState struct {
171         Ctx            *Ctx
172         Node           *Node
173         Nice           uint8
174         onlineDeadline uint
175         maxOnlineTime  uint
176         hs             *noise.HandshakeState
177         csOur          *noise.CipherState
178         csTheir        *noise.CipherState
179         payloads       chan []byte
180         infosTheir     map[[32]byte]*SPInfo
181         infosOurSeen   map[[32]byte]uint8
182         queueTheir     []*FreqWithNice
183         wg             sync.WaitGroup
184         RxBytes        int64
185         RxLastSeen     time.Time
186         TxBytes        int64
187         TxLastSeen     time.Time
188         started        time.Time
189         Duration       time.Duration
190         RxSpeed        int64
191         TxSpeed        int64
192         rxLock         *os.File
193         txLock         *os.File
194         xxOnly         TRxTx
195         rxRate         int
196         txRate         int
197         isDead         bool
198         listOnly       bool
199         onlyPkts       map[[32]byte]bool
200         sync.RWMutex
201 }
202
203 func (state *SPState) NotAlive() bool {
204         if state.isDead {
205                 return true
206         }
207         now := time.Now()
208         if state.maxOnlineTime > 0 && state.started.Add(time.Duration(state.maxOnlineTime)*time.Second).Before(now) {
209                 return true
210         }
211         return uint(now.Sub(state.RxLastSeen).Seconds()) >= state.onlineDeadline &&
212                 uint(now.Sub(state.TxLastSeen).Seconds()) >= state.onlineDeadline
213 }
214
215 func (state *SPState) dirUnlock() {
216         state.Ctx.UnlockDir(state.rxLock)
217         state.Ctx.UnlockDir(state.txLock)
218 }
219
220 func (state *SPState) WriteSP(dst io.Writer, payload []byte) error {
221         n, err := xdr.Marshal(dst, SPRaw{Magic: MagicNNCPLv1, Payload: payload})
222         if err == nil {
223                 state.TxLastSeen = time.Now()
224                 state.TxBytes += int64(n)
225         }
226         return err
227 }
228
229 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
230         var sp SPRaw
231         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
232         if err != nil {
233                 ue := err.(*xdr.UnmarshalError)
234                 if ue.Err == io.EOF {
235                         return nil, ue.Err
236                 }
237                 return nil, err
238         }
239         state.RxLastSeen = time.Now()
240         state.RxBytes += int64(n)
241         if sp.Magic != MagicNNCPLv1 {
242                 return nil, BadMagic
243         }
244         return sp.Payload, nil
245 }
246
247 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
248         var infos []*SPInfo
249         var totalSize int64
250         for job := range ctx.Jobs(nodeId, TTx) {
251                 job.Fd.Close()
252                 if job.PktEnc.Nice > nice {
253                         continue
254                 }
255                 if _, known := (*seen)[*job.HshValue]; known {
256                         continue
257                 }
258                 totalSize += job.Size
259                 infos = append(infos, &SPInfo{
260                         Nice: job.PktEnc.Nice,
261                         Size: uint64(job.Size),
262                         Hash: job.HshValue,
263                 })
264                 (*seen)[*job.HshValue] = job.PktEnc.Nice
265         }
266         sort.Sort(ByNice(infos))
267         var payloads [][]byte
268         for _, info := range infos {
269                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
270                 ctx.LogD("sp-info-our", SDS{
271                         "node": nodeId,
272                         "name": ToBase32(info.Hash[:]),
273                         "size": info.Size,
274                 }, "")
275         }
276         if totalSize > 0 {
277                 ctx.LogI("sp-infos", SDS{
278                         "xx":   string(TTx),
279                         "node": nodeId,
280                         "pkts": len(payloads),
281                         "size": totalSize,
282                 }, "")
283         }
284         return payloadsSplit(payloads)
285 }
286
287 func (state *SPState) StartI(conn ConnDeadlined) error {
288         nodeId := state.Node.Id
289         err := state.Ctx.ensureRxDir(nodeId)
290         if err != nil {
291                 return err
292         }
293         var rxLock *os.File
294         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
295                 rxLock, err = state.Ctx.LockDir(nodeId, TRx)
296                 if err != nil {
297                         return err
298                 }
299         }
300         var txLock *os.File
301         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
302                 txLock, err = state.Ctx.LockDir(nodeId, TTx)
303                 if err != nil {
304                         return err
305                 }
306         }
307         started := time.Now()
308         conf := noise.Config{
309                 CipherSuite: NoiseCipherSuite,
310                 Pattern:     noise.HandshakeIK,
311                 Initiator:   true,
312                 StaticKeypair: noise.DHKey{
313                         Private: state.Ctx.Self.NoisePrv[:],
314                         Public:  state.Ctx.Self.NoisePub[:],
315                 },
316                 PeerStatic: state.Node.NoisePub[:],
317         }
318         hs, err := noise.NewHandshakeState(conf)
319         if err != nil {
320                 return err
321         }
322         state.hs = hs
323         state.payloads = make(chan []byte)
324         state.infosTheir = make(map[[32]byte]*SPInfo)
325         state.infosOurSeen = make(map[[32]byte]uint8)
326         state.started = started
327         state.rxLock = rxLock
328         state.txLock = txLock
329
330         var infosPayloads [][]byte
331         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
332                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
333         }
334         var firstPayload []byte
335         if len(infosPayloads) > 0 {
336                 firstPayload = infosPayloads[0]
337         }
338         // Pad first payload, to hide actual number of existing files
339         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
340                 firstPayload = append(firstPayload, SPHaltMarshalized...)
341         }
342
343         var buf []byte
344         var payload []byte
345         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
346         if err != nil {
347                 state.dirUnlock()
348                 return err
349         }
350         sds := SDS{"node": nodeId, "nice": int(state.Nice)}
351         state.Ctx.LogD("sp-start", sds, "sending first message")
352         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
353         if err = state.WriteSP(conn, buf); err != nil {
354                 state.Ctx.LogE("sp-start", sds, err, "")
355                 state.dirUnlock()
356                 return err
357         }
358         state.Ctx.LogD("sp-start", sds, "waiting for first message")
359         conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
360         if buf, err = state.ReadSP(conn); err != nil {
361                 state.Ctx.LogE("sp-start", sds, err, "")
362                 state.dirUnlock()
363                 return err
364         }
365         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
366         if err != nil {
367                 state.Ctx.LogE("sp-start", sds, err, "")
368                 state.dirUnlock()
369                 return err
370         }
371         state.Ctx.LogD("sp-start", sds, "starting workers")
372         err = state.StartWorkers(conn, infosPayloads, payload)
373         if err != nil {
374                 state.Ctx.LogE("sp-start", sds, err, "")
375                 state.dirUnlock()
376                 return err
377         }
378         return err
379 }
380
381 func (state *SPState) StartR(conn ConnDeadlined) error {
382         started := time.Now()
383         conf := noise.Config{
384                 CipherSuite: NoiseCipherSuite,
385                 Pattern:     noise.HandshakeIK,
386                 Initiator:   false,
387                 StaticKeypair: noise.DHKey{
388                         Private: state.Ctx.Self.NoisePrv[:],
389                         Public:  state.Ctx.Self.NoisePub[:],
390                 },
391         }
392         hs, err := noise.NewHandshakeState(conf)
393         if err != nil {
394                 return err
395         }
396         xxOnly := TRxTx("")
397         state.hs = hs
398         state.payloads = make(chan []byte)
399         state.infosOurSeen = make(map[[32]byte]uint8)
400         state.infosTheir = make(map[[32]byte]*SPInfo)
401         state.started = started
402         state.xxOnly = xxOnly
403         var buf []byte
404         var payload []byte
405         state.Ctx.LogD("sp-start", SDS{"nice": int(state.Nice)}, "waiting for first message")
406         conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
407         if buf, err = state.ReadSP(conn); err != nil {
408                 state.Ctx.LogE("sp-start", SDS{}, err, "")
409                 return err
410         }
411         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
412                 state.Ctx.LogE("sp-start", SDS{}, err, "")
413                 return err
414         }
415
416         var node *Node
417         for _, n := range state.Ctx.Neigh {
418                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
419                         node = n
420                         break
421                 }
422         }
423         if node == nil {
424                 peerId := ToBase32(state.hs.PeerStatic())
425                 state.Ctx.LogE("sp-start", SDS{"peer": peerId}, errors.New("unknown"), "")
426                 return errors.New("Unknown peer: " + peerId)
427         }
428         state.Node = node
429         state.rxRate = node.RxRate
430         state.txRate = node.TxRate
431         state.onlineDeadline = node.OnlineDeadline
432         state.maxOnlineTime = node.MaxOnlineTime
433         sds := SDS{"node": node.Id, "nice": int(state.Nice)}
434
435         if state.Ctx.ensureRxDir(node.Id); err != nil {
436                 return err
437         }
438         var rxLock *os.File
439         if xxOnly == "" || xxOnly == TRx {
440                 rxLock, err = state.Ctx.LockDir(node.Id, TRx)
441                 if err != nil {
442                         return err
443                 }
444         }
445         state.rxLock = rxLock
446         var txLock *os.File
447         if xxOnly == "" || xxOnly == TTx {
448                 txLock, err = state.Ctx.LockDir(node.Id, TTx)
449                 if err != nil {
450                         return err
451                 }
452         }
453         state.txLock = txLock
454
455         var infosPayloads [][]byte
456         if xxOnly == "" || xxOnly == TTx {
457                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
458         }
459         var firstPayload []byte
460         if len(infosPayloads) > 0 {
461                 firstPayload = infosPayloads[0]
462         }
463         // Pad first payload, to hide actual number of existing files
464         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
465                 firstPayload = append(firstPayload, SPHaltMarshalized...)
466         }
467
468         state.Ctx.LogD("sp-start", sds, "sending first message")
469         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
470         if err != nil {
471                 state.dirUnlock()
472                 return err
473         }
474         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
475         if err = state.WriteSP(conn, buf); err != nil {
476                 state.Ctx.LogE("sp-start", sds, err, "")
477                 state.dirUnlock()
478                 return err
479         }
480         state.Ctx.LogD("sp-start", sds, "starting workers")
481         err = state.StartWorkers(conn, infosPayloads, payload)
482         if err != nil {
483                 state.dirUnlock()
484                 return err
485         }
486         return err
487 }
488
489 func (state *SPState) StartWorkers(
490         conn ConnDeadlined,
491         infosPayloads [][]byte,
492         payload []byte) error {
493         sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)}
494         if len(infosPayloads) > 1 {
495                 go func() {
496                         for _, payload := range infosPayloads[1:] {
497                                 state.Ctx.LogD(
498                                         "sp-work",
499                                         SdsAdd(sds, SDS{"size": len(payload)}),
500                                         "queuing remaining payload",
501                                 )
502                                 state.payloads <- payload
503                         }
504                 }()
505         }
506         state.Ctx.LogD(
507                 "sp-work",
508                 SdsAdd(sds, SDS{"size": len(payload)}),
509                 "processing first payload",
510         )
511         replies, err := state.ProcessSP(payload)
512         if err != nil {
513                 state.Ctx.LogE("sp-work", sds, err, "")
514                 return err
515         }
516
517         go func() {
518                 for _, reply := range replies {
519                         state.Ctx.LogD(
520                                 "sp-work",
521                                 SdsAdd(sds, SDS{"size": len(reply)}),
522                                 "queuing reply",
523                         )
524                         state.payloads <- reply
525                 }
526         }()
527
528         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
529                 go func() {
530                         for range time.Tick(time.Second) {
531                                 if state.NotAlive() {
532                                         return
533                                 }
534                                 for _, payload := range state.Ctx.infosOur(
535                                         state.Node.Id,
536                                         state.Nice,
537                                         &state.infosOurSeen,
538                                 ) {
539                                         state.Ctx.LogD(
540                                                 "sp-work",
541                                                 SdsAdd(sds, SDS{"size": len(payload)}),
542                                                 "queuing new info",
543                                         )
544                                         state.payloads <- payload
545                                 }
546                         }
547                 }()
548         }
549
550         state.wg.Add(1)
551         go func() {
552                 defer func() {
553                         state.isDead = true
554                         state.wg.Done()
555                 }()
556                 for {
557                         if state.NotAlive() {
558                                 return
559                         }
560                         var payload []byte
561                         select {
562                         case payload = <-state.payloads:
563                                 state.Ctx.LogD(
564                                         "sp-xmit",
565                                         SdsAdd(sds, SDS{"size": len(payload)}),
566                                         "got payload",
567                                 )
568                         default:
569                         }
570                         if payload == nil {
571                                 state.RLock()
572                                 if len(state.queueTheir) == 0 {
573                                         state.Ctx.LogD("sp-xmit", sds, "file queue is empty")
574                                         state.RUnlock()
575                                         time.Sleep(100 * time.Millisecond)
576                                         continue
577                                 }
578                                 freq := state.queueTheir[0].freq
579                                 state.RUnlock()
580
581                                 if state.txRate > 0 {
582                                         time.Sleep(time.Second / time.Duration(state.txRate))
583                                 }
584
585                                 sdsp := SdsAdd(sds, SDS{
586                                         "xx":   string(TTx),
587                                         "pkt":  ToBase32(freq.Hash[:]),
588                                         "size": int64(freq.Offset),
589                                 })
590                                 state.Ctx.LogD("sp-file", sdsp, "queueing")
591                                 fd, err := os.Open(filepath.Join(
592                                         state.Ctx.Spool,
593                                         state.Node.Id.String(),
594                                         string(TTx),
595                                         ToBase32(freq.Hash[:]),
596                                 ))
597                                 if err != nil {
598                                         state.Ctx.LogE("sp-file", sdsp, err, "")
599                                         break
600                                 }
601                                 fi, err := fd.Stat()
602                                 if err != nil {
603                                         state.Ctx.LogE("sp-file", sdsp, err, "")
604                                         break
605                                 }
606                                 fullSize := fi.Size()
607                                 var buf []byte
608                                 if freq.Offset < uint64(fullSize) {
609                                         state.Ctx.LogD("sp-file", sdsp, "seeking")
610                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
611                                                 state.Ctx.LogE("sp-file", sdsp, err, "")
612                                                 break
613                                         }
614                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
615                                         n, err := fd.Read(buf)
616                                         if err != nil {
617                                                 state.Ctx.LogE("sp-file", sdsp, err, "")
618                                                 break
619                                         }
620                                         buf = buf[:n]
621                                         state.Ctx.LogD(
622                                                 "sp-file",
623                                                 SdsAdd(sdsp, SDS{"size": n}),
624                                                 "read",
625                                         )
626                                 }
627                                 fd.Close()
628                                 payload = MarshalSP(SPTypeFile, SPFile{
629                                         Hash:    freq.Hash,
630                                         Offset:  freq.Offset,
631                                         Payload: buf,
632                                 })
633                                 ourSize := freq.Offset + uint64(len(buf))
634                                 sdsp["size"] = int64(ourSize)
635                                 sdsp["fullsize"] = fullSize
636                                 if state.Ctx.ShowPrgrs {
637                                         Progress(sdsp)
638                                 }
639                                 state.Lock()
640                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
641                                         if ourSize == uint64(fullSize) {
642                                                 state.Ctx.LogD("sp-file", sdsp, "finished")
643                                                 if len(state.queueTheir) > 1 {
644                                                         state.queueTheir = state.queueTheir[1:]
645                                                 } else {
646                                                         state.queueTheir = state.queueTheir[:0]
647                                                 }
648                                         } else {
649                                                 state.queueTheir[0].freq.Offset += uint64(len(buf))
650                                         }
651                                 } else {
652                                         state.Ctx.LogD("sp-file", sdsp, "queue disappeared")
653                                 }
654                                 state.Unlock()
655                         }
656                         state.Ctx.LogD(
657                                 "sp-xmit",
658                                 SdsAdd(sds, SDS{"size": len(payload)}),
659                                 "sending",
660                         )
661                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
662                         if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil {
663                                 state.Ctx.LogE("sp-xmit", sds, err, "")
664                                 break
665                         }
666                 }
667         }()
668
669         state.wg.Add(1)
670         go func() {
671                 defer func() {
672                         state.isDead = true
673                         state.wg.Done()
674                 }()
675                 for {
676                         if state.NotAlive() {
677                                 return
678                         }
679                         state.Ctx.LogD("sp-recv", sds, "waiting for payload")
680                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
681                         payload, err := state.ReadSP(conn)
682                         if err != nil {
683                                 if err == io.EOF {
684                                         break
685                                 }
686                                 unmarshalErr := err.(*xdr.UnmarshalError)
687                                 netErr, ok := unmarshalErr.Err.(net.Error)
688                                 if ok && netErr.Timeout() {
689                                         continue
690                                 }
691                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
692                                         break
693                                 }
694                                 state.Ctx.LogE("sp-recv", sds, err, "")
695                                 break
696                         }
697                         state.Ctx.LogD(
698                                 "sp-recv",
699                                 SdsAdd(sds, SDS{"size": len(payload)}),
700                                 "got payload",
701                         )
702                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
703                         if err != nil {
704                                 state.Ctx.LogE("sp-recv", sds, err, "")
705                                 break
706                         }
707                         state.Ctx.LogD(
708                                 "sp-recv",
709                                 SdsAdd(sds, SDS{"size": len(payload)}),
710                                 "processing",
711                         )
712                         replies, err := state.ProcessSP(payload)
713                         if err != nil {
714                                 state.Ctx.LogE("sp-recv", sds, err, "")
715                                 break
716                         }
717                         go func() {
718                                 for _, reply := range replies {
719                                         state.Ctx.LogD(
720                                                 "sp-recv",
721                                                 SdsAdd(sds, SDS{"size": len(reply)}),
722                                                 "queuing reply",
723                                         )
724                                         state.payloads <- reply
725                                 }
726                         }()
727                         if state.rxRate > 0 {
728                                 time.Sleep(time.Second / time.Duration(state.rxRate))
729                         }
730                 }
731         }()
732
733         return nil
734 }
735
736 func (state *SPState) Wait() {
737         state.wg.Wait()
738         state.dirUnlock()
739         state.Duration = time.Now().Sub(state.started)
740         state.RxSpeed = state.RxBytes
741         state.TxSpeed = state.TxBytes
742         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
743         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
744         if rxDuration > 0 {
745                 state.RxSpeed = state.RxBytes / rxDuration
746         }
747         if txDuration > 0 {
748                 state.TxSpeed = state.TxBytes / txDuration
749         }
750 }
751
752 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
753         sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)}
754         r := bytes.NewReader(payload)
755         var err error
756         var replies [][]byte
757         var infosGot bool
758         for r.Len() > 0 {
759                 state.Ctx.LogD("sp-process", sds, "unmarshaling header")
760                 var head SPHead
761                 if _, err = xdr.Unmarshal(r, &head); err != nil {
762                         state.Ctx.LogE("sp-process", sds, err, "")
763                         return nil, err
764                 }
765                 switch head.Type {
766                 case SPTypeInfo:
767                         infosGot = true
768                         sdsp := SdsAdd(sds, SDS{"type": "info"})
769                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
770                         var info SPInfo
771                         if _, err = xdr.Unmarshal(r, &info); err != nil {
772                                 state.Ctx.LogE("sp-process", sdsp, err, "")
773                                 return nil, err
774                         }
775                         sdsp = SdsAdd(sds, SDS{
776                                 "pkt":  ToBase32(info.Hash[:]),
777                                 "size": int64(info.Size),
778                                 "nice": int(info.Nice),
779                         })
780                         if !state.listOnly && info.Nice > state.Nice {
781                                 state.Ctx.LogD("sp-process", sdsp, "too nice")
782                                 continue
783                         }
784                         state.Ctx.LogD("sp-process", sdsp, "received")
785                         if !state.listOnly && state.xxOnly == TTx {
786                                 continue
787                         }
788                         state.Lock()
789                         state.infosTheir[*info.Hash] = &info
790                         state.Unlock()
791                         state.Ctx.LogD("sp-process", sdsp, "stating part")
792                         pktPath := filepath.Join(
793                                 state.Ctx.Spool,
794                                 state.Node.Id.String(),
795                                 string(TRx),
796                                 ToBase32(info.Hash[:]),
797                         )
798                         if _, err = os.Stat(pktPath); err == nil {
799                                 state.Ctx.LogI("sp-info", sdsp, "already done")
800                                 if !state.listOnly {
801                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
802                                 }
803                                 continue
804                         }
805                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
806                                 state.Ctx.LogI("sp-info", sdsp, "already seen")
807                                 if !state.listOnly {
808                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
809                                 }
810                                 continue
811                         }
812                         fi, err := os.Stat(pktPath + PartSuffix)
813                         var offset int64
814                         if err == nil {
815                                 offset = fi.Size()
816                         }
817                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
818                                 state.Ctx.LogI("sp-info", sdsp, "not enough space")
819                                 continue
820                         }
821                         state.Ctx.LogI(
822                                 "sp-info",
823                                 SdsAdd(sdsp, SDS{"offset": offset}),
824                                 "",
825                         )
826                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
827                                 replies = append(replies, MarshalSP(
828                                         SPTypeFreq,
829                                         SPFreq{info.Hash, uint64(offset)},
830                                 ))
831                         }
832                 case SPTypeFile:
833                         sdsp := SdsAdd(sds, SDS{"type": "file"})
834                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
835                         var file SPFile
836                         if _, err = xdr.Unmarshal(r, &file); err != nil {
837                                 state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "file"}), err, "")
838                                 return nil, err
839                         }
840                         sdsp["xx"] = string(TRx)
841                         sdsp["pkt"] = ToBase32(file.Hash[:])
842                         sdsp["size"] = len(file.Payload)
843                         dirToSync := filepath.Join(
844                                 state.Ctx.Spool,
845                                 state.Node.Id.String(),
846                                 string(TRx),
847                         )
848                         filePath := filepath.Join(dirToSync, ToBase32(file.Hash[:]))
849                         state.Ctx.LogD("sp-file", sdsp, "opening part")
850                         fd, err := os.OpenFile(
851                                 filePath+PartSuffix,
852                                 os.O_RDWR|os.O_CREATE,
853                                 os.FileMode(0666),
854                         )
855                         if err != nil {
856                                 state.Ctx.LogE("sp-file", sdsp, err, "")
857                                 return nil, err
858                         }
859                         state.Ctx.LogD(
860                                 "sp-file",
861                                 SdsAdd(sdsp, SDS{"offset": file.Offset}),
862                                 "seeking",
863                         )
864                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
865                                 state.Ctx.LogE("sp-file", sdsp, err, "")
866                                 fd.Close()
867                                 return nil, err
868                         }
869                         state.Ctx.LogD("sp-file", sdsp, "writing")
870                         _, err = fd.Write(file.Payload)
871                         if err != nil {
872                                 state.Ctx.LogE("sp-file", sdsp, err, "")
873                                 fd.Close()
874                                 return nil, err
875                         }
876                         ourSize := file.Offset + uint64(len(file.Payload))
877                         state.RLock()
878                         sdsp["size"] = int64(ourSize)
879                         sdsp["fullsize"] = int64(state.infosTheir[*file.Hash].Size)
880                         if state.Ctx.ShowPrgrs {
881                                 Progress(sdsp)
882                         }
883                         if state.infosTheir[*file.Hash].Size != ourSize {
884                                 state.RUnlock()
885                                 fd.Close()
886                                 continue
887                         }
888                         state.RUnlock()
889                         spWorkersGroup.Wait()
890                         spWorkersGroup.Add(1)
891                         go func() {
892                                 if err := fd.Sync(); err != nil {
893                                         state.Ctx.LogE("sp-file", sdsp, err, "sync")
894                                         fd.Close()
895                                         return
896                                 }
897                                 state.wg.Add(1)
898                                 defer state.wg.Done()
899                                 fd.Seek(0, io.SeekStart)
900                                 state.Ctx.LogD("sp-file", sdsp, "checking")
901                                 gut, err := Check(fd, file.Hash[:], sdsp, state.Ctx.ShowPrgrs)
902                                 fd.Close()
903                                 if err != nil || !gut {
904                                         state.Ctx.LogE("sp-file", sdsp, errors.New("checksum mismatch"), "")
905                                         return
906                                 }
907                                 state.Ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "")
908                                 if err = os.Rename(filePath+PartSuffix, filePath); err != nil {
909                                         state.Ctx.LogE("sp-file", sdsp, err, "rename")
910                                         return
911                                 }
912                                 if err = DirSync(dirToSync); err != nil {
913                                         state.Ctx.LogE("sp-file", sdsp, err, "sync")
914                                         return
915                                 }
916                                 state.Lock()
917                                 delete(state.infosTheir, *file.Hash)
918                                 state.Unlock()
919                                 spWorkersGroup.Done()
920                                 go func() {
921                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
922                                 }()
923                         }()
924                 case SPTypeDone:
925                         sdsp := SdsAdd(sds, SDS{"type": "done"})
926                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
927                         var done SPDone
928                         if _, err = xdr.Unmarshal(r, &done); err != nil {
929                                 state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "done"}), err, "")
930                                 return nil, err
931                         }
932                         sdsp["pkt"] = ToBase32(done.Hash[:])
933                         state.Ctx.LogD("sp-done", sdsp, "removing")
934                         err := os.Remove(filepath.Join(
935                                 state.Ctx.Spool,
936                                 state.Node.Id.String(),
937                                 string(TTx),
938                                 ToBase32(done.Hash[:]),
939                         ))
940                         sdsp["xx"] = string(TTx)
941                         if err == nil {
942                                 state.Ctx.LogI("sp-done", sdsp, "")
943                         } else {
944                                 state.Ctx.LogE("sp-done", sdsp, err, "")
945                         }
946                 case SPTypeFreq:
947                         sdsp := SdsAdd(sds, SDS{"type": "freq"})
948                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
949                         var freq SPFreq
950                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
951                                 state.Ctx.LogE("sp-process", sdsp, err, "")
952                                 return nil, err
953                         }
954                         sdsp["pkt"] = ToBase32(freq.Hash[:])
955                         sdsp["offset"] = freq.Offset
956                         state.Ctx.LogD("sp-process", sdsp, "queueing")
957                         nice, exists := state.infosOurSeen[*freq.Hash]
958                         if exists {
959                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
960                                         state.Lock()
961                                         insertIdx := 0
962                                         var freqWithNice *FreqWithNice
963                                         for insertIdx, freqWithNice = range state.queueTheir {
964                                                 if freqWithNice.nice > nice {
965                                                         break
966                                                 }
967                                         }
968                                         state.queueTheir = append(state.queueTheir, nil)
969                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
970                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
971                                         state.Unlock()
972                                 } else {
973                                         state.Ctx.LogD("sp-process", sdsp, "skipping")
974                                 }
975                         } else {
976                                 state.Ctx.LogD("sp-process", sdsp, "unknown")
977                         }
978                 case SPTypeHalt:
979                         state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "halt"}), "")
980                         state.Lock()
981                         state.queueTheir = nil
982                         state.Unlock()
983                 default:
984                         state.Ctx.LogE(
985                                 "sp-process",
986                                 SdsAdd(sds, SDS{"type": head.Type}),
987                                 errors.New("unknown type"),
988                                 "",
989                         )
990                         return nil, BadPktType
991                 }
992         }
993         if infosGot {
994                 var pkts int
995                 var size uint64
996                 state.RLock()
997                 for _, info := range state.infosTheir {
998                         pkts++
999                         size += info.Size
1000                 }
1001                 state.RUnlock()
1002                 state.Ctx.LogI("sp-infos", SDS{
1003                         "xx":   string(TRx),
1004                         "node": state.Node.Id,
1005                         "pkts": pkts,
1006                         "size": int64(size),
1007                 }, "")
1008         }
1009         return payloadsSplit(replies), nil
1010 }