]> Cypherpunks.ru repositories - nncp.git/blob - src/cypherpunks.ru/nncp/sp.go
SP checks .seen existence
[nncp.git] / src / cypherpunks.ru / nncp / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2018 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 package nncp
20
21 import (
22         "bytes"
23         "crypto/subtle"
24         "errors"
25         "io"
26         "net"
27         "os"
28         "path/filepath"
29         "sort"
30         "strconv"
31         "sync"
32         "time"
33
34         "github.com/davecgh/go-xdr/xdr2"
35         "github.com/flynn/noise"
36 )
37
38 const (
39         MaxSPSize       = 1<<16 - 256
40         PartSuffix      = ".part"
41         DefaultDeadline = 10
42 )
43
44 var (
45         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
46
47         SPHeadOverhead    int
48         SPInfoOverhead    int
49         SPFreqOverhead    int
50         SPFileOverhead    int
51         SPHaltMarshalized []byte
52
53         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
54                 noise.DH25519,
55                 noise.CipherChaChaPoly,
56                 noise.HashBLAKE2b,
57         )
58
59         spWorkersGroup sync.WaitGroup
60 )
61
62 type SPType uint8
63
64 const (
65         SPTypeInfo SPType = iota
66         SPTypeFreq SPType = iota
67         SPTypeFile SPType = iota
68         SPTypeDone SPType = iota
69         SPTypeHalt SPType = iota
70 )
71
72 type SPHead struct {
73         Type SPType
74 }
75
76 type SPInfo struct {
77         Nice uint8
78         Size uint64
79         Hash *[32]byte
80 }
81
82 type SPFreq struct {
83         Hash   *[32]byte
84         Offset uint64
85 }
86
87 type SPFile struct {
88         Hash    *[32]byte
89         Offset  uint64
90         Payload []byte
91 }
92
93 type SPDone struct {
94         Hash *[32]byte
95 }
96
97 type SPRaw struct {
98         Magic   [8]byte
99         Payload []byte
100 }
101
102 func init() {
103         var buf bytes.Buffer
104         spHead := SPHead{Type: SPTypeHalt}
105         if _, err := xdr.Marshal(&buf, spHead); err != nil {
106                 panic(err)
107         }
108         copy(SPHaltMarshalized, buf.Bytes())
109         SPHeadOverhead = buf.Len()
110         buf.Reset()
111
112         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
113         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
114                 panic(err)
115         }
116         SPInfoOverhead = buf.Len()
117         buf.Reset()
118
119         spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
120         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
121                 panic(err)
122         }
123         SPFreqOverhead = buf.Len()
124         buf.Reset()
125
126         spFile := SPFile{Hash: new([32]byte), Offset: 123}
127         if _, err := xdr.Marshal(&buf, spFile); err != nil {
128                 panic(err)
129         }
130         SPFileOverhead = buf.Len()
131 }
132
133 func MarshalSP(typ SPType, sp interface{}) []byte {
134         var buf bytes.Buffer
135         var err error
136         if _, err = xdr.Marshal(&buf, SPHead{typ}); err != nil {
137                 panic(err)
138         }
139         if _, err = xdr.Marshal(&buf, sp); err != nil {
140                 panic(err)
141         }
142         return buf.Bytes()
143 }
144
145 func payloadsSplit(payloads [][]byte) [][]byte {
146         var outbounds [][]byte
147         outbound := make([]byte, 0, MaxSPSize)
148         for i, payload := range payloads {
149                 outbound = append(outbound, payload...)
150                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
151                         outbounds = append(outbounds, outbound)
152                         outbound = make([]byte, 0, MaxSPSize)
153                 }
154         }
155         if len(outbound) > 0 {
156                 outbounds = append(outbounds, outbound)
157         }
158         return outbounds
159 }
160
161 type SPState struct {
162         ctx            *Ctx
163         Node           *Node
164         onlineDeadline uint
165         maxOnlineTime  uint
166         nice           uint8
167         hs             *noise.HandshakeState
168         csOur          *noise.CipherState
169         csTheir        *noise.CipherState
170         payloads       chan []byte
171         infosTheir     map[[32]byte]*SPInfo
172         infosOurSeen   map[[32]byte]struct{}
173         queueTheir     []*SPFreq
174         wg             sync.WaitGroup
175         RxBytes        int64
176         RxLastSeen     time.Time
177         TxBytes        int64
178         TxLastSeen     time.Time
179         started        time.Time
180         Duration       time.Duration
181         RxSpeed        int64
182         TxSpeed        int64
183         rxLock         *os.File
184         txLock         *os.File
185         xxOnly         TRxTx
186         isDead         bool
187         sync.RWMutex
188 }
189
190 func (state *SPState) NotAlive() bool {
191         if state.isDead {
192                 return true
193         }
194         now := time.Now()
195         if state.maxOnlineTime > 0 && state.started.Add(time.Duration(state.maxOnlineTime)*time.Second).Before(now) {
196                 return true
197         }
198         return uint(now.Sub(state.RxLastSeen).Seconds()) >= state.onlineDeadline && uint(now.Sub(state.TxLastSeen).Seconds()) >= state.onlineDeadline
199 }
200
201 func (state *SPState) dirUnlock() {
202         state.ctx.UnlockDir(state.rxLock)
203         state.ctx.UnlockDir(state.txLock)
204 }
205
206 func (state *SPState) WriteSP(dst io.Writer, payload []byte) error {
207         n, err := xdr.Marshal(dst, SPRaw{Magic: MagicNNCPLv1, Payload: payload})
208         if err == nil {
209                 state.TxLastSeen = time.Now()
210                 state.TxBytes += int64(n)
211         }
212         return err
213 }
214
215 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
216         var sp SPRaw
217         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
218         if err != nil {
219                 return nil, err
220         }
221         state.RxLastSeen = time.Now()
222         state.RxBytes += int64(n)
223         if sp.Magic != MagicNNCPLv1 {
224                 return nil, BadMagic
225         }
226         return sp.Payload, nil
227 }
228
229 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]struct{}) [][]byte {
230         var infos []*SPInfo
231         var totalSize int64
232         for job := range ctx.Jobs(nodeId, TTx) {
233                 job.Fd.Close()
234                 if job.PktEnc.Nice > nice {
235                         continue
236                 }
237                 if _, known := (*seen)[*job.HshValue]; known {
238                         continue
239                 }
240                 totalSize += job.Size
241                 infos = append(infos, &SPInfo{
242                         Nice: job.PktEnc.Nice,
243                         Size: uint64(job.Size),
244                         Hash: job.HshValue,
245                 })
246                 (*seen)[*job.HshValue] = struct{}{}
247         }
248         sort.Sort(ByNice(infos))
249         var payloads [][]byte
250         for _, info := range infos {
251                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
252                 ctx.LogD("sp-info-our", SDS{
253                         "node": nodeId,
254                         "name": ToBase32(info.Hash[:]),
255                         "size": strconv.FormatInt(int64(info.Size), 10),
256                 }, "")
257         }
258         if totalSize > 0 {
259                 ctx.LogI("sp-infos", SDS{
260                         "xx":   string(TTx),
261                         "node": nodeId,
262                         "pkts": strconv.Itoa(len(payloads)),
263                         "size": strconv.FormatInt(totalSize, 10),
264                 }, "")
265         }
266         return payloadsSplit(payloads)
267 }
268
269 func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly TRxTx, onlineDeadline, maxOnlineTime uint) (*SPState, error) {
270         err := ctx.ensureRxDir(nodeId)
271         if err != nil {
272                 return nil, err
273         }
274         var rxLock *os.File
275         if xxOnly == "" || xxOnly == TRx {
276                 rxLock, err = ctx.LockDir(nodeId, TRx)
277                 if err != nil {
278                         return nil, err
279                 }
280         }
281         var txLock *os.File
282         if xxOnly == "" || xxOnly == TTx {
283                 txLock, err = ctx.LockDir(nodeId, TTx)
284                 if err != nil {
285                         return nil, err
286                 }
287         }
288         started := time.Now()
289         node := ctx.Neigh[*nodeId]
290         conf := noise.Config{
291                 CipherSuite: NoiseCipherSuite,
292                 Pattern:     noise.HandshakeIK,
293                 Initiator:   true,
294                 StaticKeypair: noise.DHKey{
295                         Private: ctx.Self.NoisePrv[:],
296                         Public:  ctx.Self.NoisePub[:],
297                 },
298                 PeerStatic: node.NoisePub[:],
299         }
300         hs, err := noise.NewHandshakeState(conf)
301         if err != nil {
302                 return nil, err
303         }
304         state := SPState{
305                 ctx:            ctx,
306                 hs:             hs,
307                 Node:           node,
308                 onlineDeadline: onlineDeadline,
309                 maxOnlineTime:  maxOnlineTime,
310                 nice:           nice,
311                 payloads:       make(chan []byte),
312                 infosTheir:     make(map[[32]byte]*SPInfo),
313                 infosOurSeen:   make(map[[32]byte]struct{}),
314                 started:        started,
315                 rxLock:         rxLock,
316                 txLock:         txLock,
317                 xxOnly:         xxOnly,
318         }
319
320         var infosPayloads [][]byte
321         if xxOnly == "" || xxOnly == TTx {
322                 infosPayloads = ctx.infosOur(nodeId, nice, &state.infosOurSeen)
323         }
324         var firstPayload []byte
325         if len(infosPayloads) > 0 {
326                 firstPayload = infosPayloads[0]
327         }
328         // Pad first payload, to hide actual number of existing files
329         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
330                 firstPayload = append(firstPayload, SPHaltMarshalized...)
331         }
332
333         var buf []byte
334         var payload []byte
335         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
336         if err != nil {
337                 state.dirUnlock()
338                 return nil, err
339         }
340         sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))}
341         ctx.LogD("sp-start", sds, "sending first message")
342         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
343         if err = state.WriteSP(conn, buf); err != nil {
344                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
345                 state.dirUnlock()
346                 return nil, err
347         }
348         ctx.LogD("sp-start", sds, "waiting for first message")
349         conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
350         if buf, err = state.ReadSP(conn); err != nil {
351                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
352                 state.dirUnlock()
353                 return nil, err
354         }
355         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
356         if err != nil {
357                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
358                 state.dirUnlock()
359                 return nil, err
360         }
361         ctx.LogD("sp-start", sds, "starting workers")
362         err = state.StartWorkers(conn, infosPayloads, payload)
363         if err != nil {
364                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
365                 state.dirUnlock()
366                 return nil, err
367         }
368         return &state, err
369 }
370
371 func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly TRxTx) (*SPState, error) {
372         started := time.Now()
373         conf := noise.Config{
374                 CipherSuite: NoiseCipherSuite,
375                 Pattern:     noise.HandshakeIK,
376                 Initiator:   false,
377                 StaticKeypair: noise.DHKey{
378                         Private: ctx.Self.NoisePrv[:],
379                         Public:  ctx.Self.NoisePub[:],
380                 },
381         }
382         hs, err := noise.NewHandshakeState(conf)
383         if err != nil {
384                 return nil, err
385         }
386         state := SPState{
387                 ctx:          ctx,
388                 hs:           hs,
389                 nice:         nice,
390                 payloads:     make(chan []byte),
391                 infosOurSeen: make(map[[32]byte]struct{}),
392                 infosTheir:   make(map[[32]byte]*SPInfo),
393                 started:      started,
394                 xxOnly:       xxOnly,
395         }
396         var buf []byte
397         var payload []byte
398         ctx.LogD(
399                 "sp-start",
400                 SDS{"nice": strconv.Itoa(int(nice))},
401                 "waiting for first message",
402         )
403         conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
404         if buf, err = state.ReadSP(conn); err != nil {
405                 ctx.LogE("sp-start", SDS{"err": err}, "")
406                 return nil, err
407         }
408         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
409                 ctx.LogE("sp-start", SDS{"err": err}, "")
410                 return nil, err
411         }
412
413         var node *Node
414         for _, node = range ctx.Neigh {
415                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), node.NoisePub[:]) == 1 {
416                         break
417                 }
418         }
419         if node == nil {
420                 peerId := ToBase32(state.hs.PeerStatic())
421                 ctx.LogE("sp-start", SDS{"peer": peerId}, "unknown")
422                 return nil, errors.New("Unknown peer: " + peerId)
423         }
424         state.Node = node
425         state.onlineDeadline = node.OnlineDeadline
426         state.maxOnlineTime = node.MaxOnlineTime
427         sds := SDS{"node": node.Id, "nice": strconv.Itoa(int(nice))}
428
429         if ctx.ensureRxDir(node.Id); err != nil {
430                 return nil, err
431         }
432         var rxLock *os.File
433         if xxOnly == "" || xxOnly == TRx {
434                 rxLock, err = ctx.LockDir(node.Id, TRx)
435                 if err != nil {
436                         return nil, err
437                 }
438         }
439         state.rxLock = rxLock
440         var txLock *os.File
441         if xxOnly == "" || xxOnly == TTx {
442                 txLock, err = ctx.LockDir(node.Id, TTx)
443                 if err != nil {
444                         return nil, err
445                 }
446         }
447         state.txLock = txLock
448
449         var infosPayloads [][]byte
450         if xxOnly == "" || xxOnly == TTx {
451                 infosPayloads = ctx.infosOur(node.Id, nice, &state.infosOurSeen)
452         }
453         var firstPayload []byte
454         if len(infosPayloads) > 0 {
455                 firstPayload = infosPayloads[0]
456         }
457         // Pad first payload, to hide actual number of existing files
458         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
459                 firstPayload = append(firstPayload, SPHaltMarshalized...)
460         }
461
462         ctx.LogD("sp-start", sds, "sending first message")
463         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
464         if err != nil {
465                 state.dirUnlock()
466                 return nil, err
467         }
468         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline * time.Second))
469         if err = state.WriteSP(conn, buf); err != nil {
470                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
471                 state.dirUnlock()
472                 return nil, err
473         }
474         ctx.LogD("sp-start", sds, "starting workers")
475         err = state.StartWorkers(conn, infosPayloads, payload)
476         if err != nil {
477                 state.dirUnlock()
478                 return nil, err
479         }
480         return &state, err
481 }
482
483 func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payload []byte) error {
484         sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.nice))}
485         if len(infosPayloads) > 1 {
486                 go func() {
487                         for _, payload := range infosPayloads[1:] {
488                                 state.ctx.LogD(
489                                         "sp-work",
490                                         SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
491                                         "queuing remaining payload",
492                                 )
493                                 state.payloads <- payload
494                         }
495                 }()
496         }
497         state.ctx.LogD(
498                 "sp-work",
499                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
500                 "processing first payload",
501         )
502         replies, err := state.ProcessSP(payload)
503         if err != nil {
504                 state.ctx.LogE("sp-work", SdsAdd(sds, SDS{"err": err}), "")
505                 return err
506         }
507
508         go func() {
509                 for _, reply := range replies {
510                         state.ctx.LogD(
511                                 "sp-work",
512                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}),
513                                 "queuing reply",
514                         )
515                         state.payloads <- reply
516                 }
517         }()
518
519         if state.xxOnly == "" || state.xxOnly == TTx {
520                 go func() {
521                         for range time.Tick(time.Second) {
522                                 for _, payload := range state.ctx.infosOur(
523                                         state.Node.Id,
524                                         state.nice,
525                                         &state.infosOurSeen,
526                                 ) {
527                                         state.ctx.LogD(
528                                                 "sp-work",
529                                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
530                                                 "queuing new info",
531                                         )
532                                         state.payloads <- payload
533                                 }
534                         }
535                 }()
536         }
537
538         state.wg.Add(1)
539         go func() {
540                 defer func() {
541                         state.isDead = true
542                         state.wg.Done()
543                 }()
544                 for {
545                         if state.NotAlive() {
546                                 return
547                         }
548                         var payload []byte
549                         select {
550                         case payload = <-state.payloads:
551                                 state.ctx.LogD(
552                                         "sp-xmit",
553                                         SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
554                                         "got payload",
555                                 )
556                         default:
557                         }
558                         if payload == nil {
559                                 state.RLock()
560                                 if len(state.queueTheir) == 0 {
561                                         state.ctx.LogD("sp-xmit", sds, "file queue is empty")
562                                         state.RUnlock()
563                                         time.Sleep(100 * time.Millisecond)
564                                         continue
565                                 }
566                                 freq := state.queueTheir[0]
567                                 state.RUnlock()
568                                 sdsp := SdsAdd(sds, SDS{
569                                         "xx":   string(TTx),
570                                         "hash": ToBase32(freq.Hash[:]),
571                                         "size": strconv.FormatInt(int64(freq.Offset), 10),
572                                 })
573                                 state.ctx.LogD("sp-file", sdsp, "queueing")
574                                 fd, err := os.Open(filepath.Join(
575                                         state.ctx.Spool,
576                                         state.Node.Id.String(),
577                                         string(TTx),
578                                         ToBase32(freq.Hash[:]),
579                                 ))
580                                 if err != nil {
581                                         state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
582                                         break
583                                 }
584                                 fi, err := fd.Stat()
585                                 if err != nil {
586                                         state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
587                                         break
588                                 }
589                                 fullSize := uint64(fi.Size())
590                                 var buf []byte
591                                 if freq.Offset < fullSize {
592                                         state.ctx.LogD("sp-file", sdsp, "seeking")
593                                         if _, err = fd.Seek(int64(freq.Offset), 0); err != nil {
594                                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
595                                                 break
596                                         }
597                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
598                                         n, err := fd.Read(buf)
599                                         if err != nil {
600                                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
601                                                 break
602                                         }
603                                         buf = buf[:n]
604                                         state.ctx.LogD(
605                                                 "sp-file",
606                                                 SdsAdd(sdsp, SDS{"size": strconv.Itoa(n)}),
607                                                 "read",
608                                         )
609                                 }
610                                 fd.Close()
611                                 payload = MarshalSP(SPTypeFile, SPFile{
612                                         Hash:    freq.Hash,
613                                         Offset:  freq.Offset,
614                                         Payload: buf,
615                                 })
616                                 ourSize := freq.Offset + uint64(len(buf))
617                                 sdsp["size"] = strconv.FormatInt(int64(ourSize), 10)
618                                 sdsp["fullsize"] = strconv.FormatInt(int64(fullSize), 10)
619                                 state.ctx.LogP("sp-file", sdsp, "")
620                                 state.Lock()
621                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].Hash == *freq.Hash {
622                                         if ourSize == fullSize {
623                                                 state.ctx.LogD("sp-file", sdsp, "finished")
624                                                 if len(state.queueTheir) > 1 {
625                                                         state.queueTheir = state.queueTheir[1:]
626                                                 } else {
627                                                         state.queueTheir = state.queueTheir[:0]
628                                                 }
629                                         } else {
630                                                 state.queueTheir[0].Offset += uint64(len(buf))
631                                         }
632                                 } else {
633                                         state.ctx.LogD("sp-file", sdsp, "queue disappeared")
634                                 }
635                                 state.Unlock()
636                         }
637                         state.ctx.LogD(
638                                 "sp-xmit",
639                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
640                                 "sending",
641                         )
642                         conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
643                         if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil {
644                                 state.ctx.LogE("sp-xmit", SdsAdd(sds, SDS{"err": err}), "")
645                                 break
646                         }
647                 }
648         }()
649
650         state.wg.Add(1)
651         go func() {
652                 defer func() {
653                         state.isDead = true
654                         state.wg.Done()
655                 }()
656                 for {
657                         if state.NotAlive() {
658                                 return
659                         }
660                         state.ctx.LogD("sp-recv", sds, "waiting for payload")
661                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline * time.Second))
662                         payload, err := state.ReadSP(conn)
663                         if err != nil {
664                                 unmarshalErr := err.(*xdr.UnmarshalError)
665                                 netErr, ok := unmarshalErr.Err.(net.Error)
666                                 if ok && netErr.Timeout() {
667                                         continue
668                                 }
669                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
670                                         break
671                                 }
672                                 state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
673                                 break
674                         }
675                         state.ctx.LogD(
676                                 "sp-recv",
677                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
678                                 "got payload",
679                         )
680                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
681                         if err != nil {
682                                 state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
683                                 break
684                         }
685                         state.ctx.LogD(
686                                 "sp-recv",
687                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
688                                 "processing",
689                         )
690                         replies, err := state.ProcessSP(payload)
691                         if err != nil {
692                                 state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
693                                 break
694                         }
695                         go func() {
696                                 for _, reply := range replies {
697                                         state.ctx.LogD(
698                                                 "sp-recv",
699                                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}),
700                                                 "queuing reply",
701                                         )
702                                         state.payloads <- reply
703                                 }
704                         }()
705                 }
706         }()
707
708         return nil
709 }
710
711 func (state *SPState) Wait() {
712         state.wg.Wait()
713         state.dirUnlock()
714         state.Duration = time.Now().Sub(state.started)
715         state.RxSpeed = state.RxBytes
716         state.TxSpeed = state.TxBytes
717         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
718         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
719         if rxDuration > 0 {
720                 state.RxSpeed = state.RxBytes / rxDuration
721         }
722         if txDuration > 0 {
723                 state.TxSpeed = state.TxBytes / txDuration
724         }
725 }
726
727 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
728         sds := SDS{"node": state.Node.Id, "nice": strconv.Itoa(int(state.nice))}
729         r := bytes.NewReader(payload)
730         var err error
731         var replies [][]byte
732         var infosGot bool
733         for r.Len() > 0 {
734                 state.ctx.LogD("sp-process", sds, "unmarshaling header")
735                 var head SPHead
736                 if _, err = xdr.Unmarshal(r, &head); err != nil {
737                         state.ctx.LogE("sp-process", SdsAdd(sds, SDS{"err": err}), "")
738                         return nil, err
739                 }
740                 switch head.Type {
741                 case SPTypeInfo:
742                         infosGot = true
743                         sdsp := SdsAdd(sds, SDS{"type": "info"})
744                         state.ctx.LogD("sp-process", sdsp, "unmarshaling packet")
745                         var info SPInfo
746                         if _, err = xdr.Unmarshal(r, &info); err != nil {
747                                 state.ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "")
748                                 return nil, err
749                         }
750                         sdsp = SdsAdd(sds, SDS{
751                                 "hash": ToBase32(info.Hash[:]),
752                                 "size": strconv.FormatInt(int64(info.Size), 10),
753                         })
754                         if info.Nice > state.nice {
755                                 state.ctx.LogD("sp-process", sdsp, "too nice")
756                                 continue
757                         }
758                         state.ctx.LogD("sp-process", sdsp, "received")
759                         if state.xxOnly == TTx {
760                                 continue
761                         }
762                         state.Lock()
763                         state.infosTheir[*info.Hash] = &info
764                         state.Unlock()
765                         state.ctx.LogD("sp-process", sdsp, "stating part")
766                         pktPath := filepath.Join(
767                                 state.ctx.Spool,
768                                 state.Node.Id.String(),
769                                 string(TRx),
770                                 ToBase32(info.Hash[:]),
771                         )
772                         if _, err = os.Stat(pktPath); err == nil {
773                                 state.ctx.LogD("sp-process", sdsp, "already done")
774                                 replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
775                                 continue
776                         }
777                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
778                                 state.ctx.LogD("sp-process", sdsp, "already seen")
779                                 replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
780                                 continue
781                         }
782                         fi, err := os.Stat(pktPath + PartSuffix)
783                         var offset int64
784                         if err == nil {
785                                 offset = fi.Size()
786                                 state.ctx.LogD(
787                                         "sp-process",
788                                         SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(offset, 10)}),
789                                         "part exists",
790                                 )
791                         }
792                         replies = append(replies, MarshalSP(
793                                 SPTypeFreq,
794                                 SPFreq{info.Hash, uint64(offset)},
795                         ))
796                 case SPTypeFile:
797                         state.ctx.LogD(
798                                 "sp-process",
799                                 SdsAdd(sds, SDS{"type": "file"}),
800                                 "unmarshaling packet",
801                         )
802                         var file SPFile
803                         if _, err = xdr.Unmarshal(r, &file); err != nil {
804                                 state.ctx.LogE("sp-process", SdsAdd(sds, SDS{
805                                         "err":  err,
806                                         "type": "file",
807                                 }), "")
808                                 return nil, err
809                         }
810                         sdsp := SdsAdd(sds, SDS{
811                                 "xx":   string(TRx),
812                                 "hash": ToBase32(file.Hash[:]),
813                                 "size": strconv.Itoa(len(file.Payload)),
814                         })
815                         filePath := filepath.Join(
816                                 state.ctx.Spool,
817                                 state.Node.Id.String(),
818                                 string(TRx),
819                                 ToBase32(file.Hash[:]),
820                         )
821                         state.ctx.LogD("sp-file", sdsp, "opening part")
822                         fd, err := os.OpenFile(
823                                 filePath+PartSuffix,
824                                 os.O_RDWR|os.O_CREATE,
825                                 os.FileMode(0600),
826                         )
827                         if err != nil {
828                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
829                                 return nil, err
830                         }
831                         state.ctx.LogD(
832                                 "sp-file",
833                                 SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(int64(file.Offset), 10)}),
834                                 "seeking",
835                         )
836                         if _, err = fd.Seek(int64(file.Offset), 0); err != nil {
837                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
838                                 fd.Close()
839                                 return nil, err
840                         }
841                         state.ctx.LogD("sp-file", sdsp, "writing")
842                         _, err = fd.Write(file.Payload)
843                         if err != nil {
844                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
845                                 fd.Close()
846                                 return nil, err
847                         }
848                         ourSize := uint64(file.Offset) + uint64(len(file.Payload))
849                         state.RLock()
850                         sdsp["fullsize"] = strconv.FormatInt(int64(state.infosTheir[*file.Hash].Size), 10)
851                         sdsp["size"] = strconv.FormatInt(int64(ourSize), 10)
852                         state.ctx.LogP("sp-file", sdsp, "")
853                         if state.infosTheir[*file.Hash].Size != ourSize {
854                                 state.RUnlock()
855                                 fd.Close()
856                                 continue
857                         }
858                         state.RUnlock()
859                         spWorkersGroup.Wait()
860                         spWorkersGroup.Add(1)
861                         go func() {
862                                 if err := fd.Sync(); err != nil {
863                                         state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "sync")
864                                         fd.Close()
865                                         return
866                                 }
867                                 state.wg.Add(1)
868                                 defer state.wg.Done()
869                                 fd.Seek(0, 0)
870                                 state.ctx.LogD("sp-file", sdsp, "checking")
871                                 gut, err := Check(fd, file.Hash[:])
872                                 fd.Close()
873                                 if err != nil || !gut {
874                                         state.ctx.LogE("sp-file", sdsp, "checksum mismatch")
875                                         return
876                                 }
877                                 state.ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "")
878                                 os.Rename(filePath+PartSuffix, filePath)
879                                 state.Lock()
880                                 delete(state.infosTheir, *file.Hash)
881                                 state.Unlock()
882                                 spWorkersGroup.Done()
883                                 go func() {
884                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
885                                 }()
886                         }()
887                 case SPTypeDone:
888                         state.ctx.LogD(
889                                 "sp-process",
890                                 SdsAdd(sds, SDS{"type": "done"}),
891                                 "unmarshaling packet",
892                         )
893                         var done SPDone
894                         if _, err = xdr.Unmarshal(r, &done); err != nil {
895                                 state.ctx.LogE("sp-process", SdsAdd(sds, SDS{
896                                         "type": "done",
897                                         "err":  err,
898                                 }), "")
899                                 return nil, err
900                         }
901                         sdsp := SdsAdd(sds, SDS{"hash": ToBase32(done.Hash[:])})
902                         state.ctx.LogD("sp-done", sdsp, "removing")
903                         err := os.Remove(filepath.Join(
904                                 state.ctx.Spool,
905                                 state.Node.Id.String(),
906                                 string(TTx),
907                                 ToBase32(done.Hash[:]),
908                         ))
909                         if err == nil {
910                                 state.ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TTx)}), "")
911                         } else {
912                                 state.ctx.LogE("sp-done", SdsAdd(sdsp, SDS{"xx": string(TTx)}), "")
913                         }
914                 case SPTypeFreq:
915                         sdsp := SdsAdd(sds, SDS{"type": "freq"})
916                         state.ctx.LogD("sp-process", sdsp, "unmarshaling packet")
917                         var freq SPFreq
918                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
919                                 state.ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "")
920                                 return nil, err
921                         }
922                         state.ctx.LogD("sp-process", SdsAdd(sdsp, SDS{
923                                 "hash":   ToBase32(freq.Hash[:]),
924                                 "offset": strconv.FormatInt(int64(freq.Offset), 10),
925                         }), "queueing")
926                         state.Lock()
927                         state.queueTheir = append(state.queueTheir, &freq)
928                         state.Unlock()
929                 case SPTypeHalt:
930                         sdsp := SdsAdd(sds, SDS{"type": "halt"})
931                         state.ctx.LogD("sp-process", sdsp, "")
932                         state.Lock()
933                         state.queueTheir = nil
934                         state.Unlock()
935                 default:
936                         state.ctx.LogE(
937                                 "sp-process",
938                                 SdsAdd(sds, SDS{"type": head.Type}),
939                                 "unknown",
940                         )
941                         return nil, BadPktType
942                 }
943         }
944         if infosGot {
945                 var pkts int
946                 var size uint64
947                 state.RLock()
948                 for _, info := range state.infosTheir {
949                         pkts++
950                         size += info.Size
951                 }
952                 state.RUnlock()
953                 state.ctx.LogI("sp-infos", SDS{
954                         "xx":   string(TRx),
955                         "node": state.Node.Id,
956                         "pkts": strconv.Itoa(pkts),
957                         "size": strconv.FormatInt(int64(size), 10),
958                 }, "")
959         }
960         return payloadsSplit(replies), nil
961 }