]> Cypherpunks.ru repositories - nncp.git/blob - src/cypherpunks.ru/nncp/sp.go
Fix max Noise payload size
[nncp.git] / src / cypherpunks.ru / nncp / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2017 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 package nncp
20
21 import (
22         "bytes"
23         "crypto/subtle"
24         "errors"
25         "io"
26         "net"
27         "os"
28         "path/filepath"
29         "sort"
30         "strconv"
31         "sync"
32         "time"
33
34         "github.com/davecgh/go-xdr/xdr2"
35         "github.com/flynn/noise"
36 )
37
38 const (
39         MaxSPSize        = 2<<16 - 256
40         PartSuffix       = ".part"
41         DeadlineDuration = 10
42 )
43
44 var (
45         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 1, 0, 0}
46
47         SPHeadOverhead    int
48         SPInfoOverhead    int
49         SPFreqOverhead    int
50         SPFileOverhead    int
51         SPHaltMarshalized []byte
52
53         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
54                 noise.DH25519,
55                 noise.CipherChaChaPoly,
56                 noise.HashBLAKE2b,
57         )
58 )
59
60 type SPType uint8
61
62 const (
63         SPTypeInfo SPType = iota
64         SPTypeFreq SPType = iota
65         SPTypeFile SPType = iota
66         SPTypeDone SPType = iota
67         SPTypeHalt SPType = iota
68 )
69
70 type SPHead struct {
71         Type SPType
72 }
73
74 type SPInfo struct {
75         Nice uint8
76         Size uint64
77         Hash *[32]byte
78 }
79
80 type SPFreq struct {
81         Hash   *[32]byte
82         Offset uint64
83 }
84
85 type SPFile struct {
86         Hash    *[32]byte
87         Offset  uint64
88         Payload []byte
89 }
90
91 type SPDone struct {
92         Hash *[32]byte
93 }
94
95 type SPRaw struct {
96         Magic   [8]byte
97         Payload []byte
98 }
99
100 func init() {
101         var buf bytes.Buffer
102         spHead := SPHead{Type: SPTypeHalt}
103         if _, err := xdr.Marshal(&buf, spHead); err != nil {
104                 panic(err)
105         }
106         copy(SPHaltMarshalized, buf.Bytes())
107         SPHeadOverhead = buf.Len()
108         buf.Reset()
109
110         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
111         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
112                 panic(err)
113         }
114         SPInfoOverhead = buf.Len()
115         buf.Reset()
116
117         spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
118         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
119                 panic(err)
120         }
121         SPFreqOverhead = buf.Len()
122         buf.Reset()
123
124         spFile := SPFile{Hash: new([32]byte), Offset: 123}
125         if _, err := xdr.Marshal(&buf, spFile); err != nil {
126                 panic(err)
127         }
128         SPFileOverhead = buf.Len()
129 }
130
131 func MarshalSP(typ SPType, sp interface{}) []byte {
132         var buf bytes.Buffer
133         var err error
134         if _, err = xdr.Marshal(&buf, SPHead{typ}); err != nil {
135                 panic(err)
136         }
137         if _, err = xdr.Marshal(&buf, sp); err != nil {
138                 panic(err)
139         }
140         return buf.Bytes()
141 }
142
143 func payloadsSplit(payloads [][]byte) [][]byte {
144         var outbounds [][]byte
145         outbound := make([]byte, 0, MaxSPSize)
146         for i, payload := range payloads {
147                 outbound = append(outbound, payload...)
148                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
149                         outbounds = append(outbounds, outbound)
150                         outbound = make([]byte, 0, MaxSPSize)
151                 }
152         }
153         if len(outbound) > 0 {
154                 outbounds = append(outbounds, outbound)
155         }
156         return outbounds
157 }
158
159 type SPState struct {
160         ctx        *Ctx
161         NodeId     *NodeId
162         nice       uint8
163         hs         *noise.HandshakeState
164         csOur      *noise.CipherState
165         csTheir    *noise.CipherState
166         payloads   chan []byte
167         infosTheir map[[32]byte]*SPInfo
168         queueTheir []*SPFreq
169         wg         sync.WaitGroup
170         RxBytes    int64
171         RxLastSeen time.Time
172         TxBytes    int64
173         TxLastSeen time.Time
174         started    time.Time
175         Duration   time.Duration
176         RxSpeed    int64
177         TxSpeed    int64
178         rxLock     *os.File
179         txLock     *os.File
180         xxOnly     *TRxTx
181         sync.RWMutex
182 }
183
184 func (state *SPState) isDead() bool {
185         now := time.Now()
186         return now.Sub(state.RxLastSeen).Seconds() >= DeadlineDuration && now.Sub(state.TxLastSeen).Seconds() >= DeadlineDuration
187 }
188
189 func (state *SPState) dirUnlock() {
190         state.ctx.UnlockDir(state.rxLock)
191         state.ctx.UnlockDir(state.txLock)
192 }
193
194 func (state *SPState) WriteSP(dst io.Writer, payload []byte) error {
195         n, err := xdr.Marshal(dst, SPRaw{Magic: MagicNNCPLv1, Payload: payload})
196         if err == nil {
197                 state.TxLastSeen = time.Now()
198                 state.TxBytes += int64(n)
199         }
200         return err
201 }
202
203 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
204         var sp SPRaw
205         n, err := xdr.Unmarshal(src, &sp)
206         if err != nil {
207                 return nil, err
208         }
209         state.RxLastSeen = time.Now()
210         state.RxBytes += int64(n)
211         if sp.Magic != MagicNNCPLv1 {
212                 return nil, BadMagic
213         }
214         return sp.Payload, nil
215 }
216
217 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8) [][]byte {
218         var infos []*SPInfo
219         var totalSize int64
220         for job := range ctx.Jobs(nodeId, TTx) {
221                 job.Fd.Close()
222                 if job.PktEnc.Nice > nice {
223                         continue
224                 }
225                 totalSize += job.Size
226                 infos = append(infos, &SPInfo{
227                         Nice: job.PktEnc.Nice,
228                         Size: uint64(job.Size),
229                         Hash: job.HshValue,
230                 })
231         }
232         sort.Sort(ByNice(infos))
233         var payloads [][]byte
234         for _, info := range infos {
235                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
236                 ctx.LogD("sp-info-our", SDS{
237                         "node": nodeId,
238                         "name": ToBase32(info.Hash[:]),
239                         "size": strconv.FormatInt(int64(info.Size), 10),
240                 }, "")
241         }
242         ctx.LogI("sp-infos", SDS{
243                 "xx":   string(TTx),
244                 "node": nodeId,
245                 "pkts": strconv.Itoa(len(payloads)),
246                 "size": strconv.FormatInt(totalSize, 10),
247         }, "")
248         return payloadsSplit(payloads)
249 }
250
251 func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx) (*SPState, error) {
252         err := ctx.ensureRxDir(nodeId)
253         if err != nil {
254                 return nil, err
255         }
256         var rxLock *os.File
257         if xxOnly != nil && *xxOnly == TRx {
258                 rxLock, err = ctx.LockDir(nodeId, TRx)
259                 if err != nil {
260                         return nil, err
261                 }
262         }
263         var txLock *os.File
264         if xxOnly != nil && *xxOnly == TTx {
265                 txLock, err = ctx.LockDir(nodeId, TTx)
266                 if err != nil {
267                         return nil, err
268                 }
269         }
270         started := time.Now()
271         conf := noise.Config{
272                 CipherSuite: NoiseCipherSuite,
273                 Pattern:     noise.HandshakeIK,
274                 Initiator:   true,
275                 StaticKeypair: noise.DHKey{
276                         Private: ctx.Self.NoisePrv[:],
277                         Public:  ctx.Self.NoisePub[:],
278                 },
279                 PeerStatic: ctx.Neigh[*nodeId].NoisePub[:],
280         }
281         state := SPState{
282                 ctx:        ctx,
283                 hs:         noise.NewHandshakeState(conf),
284                 NodeId:     nodeId,
285                 nice:       nice,
286                 payloads:   make(chan []byte),
287                 infosTheir: make(map[[32]byte]*SPInfo),
288                 started:    started,
289                 rxLock:     rxLock,
290                 txLock:     txLock,
291                 xxOnly:     xxOnly,
292         }
293
294         var infosPayloads [][]byte
295         if xxOnly == nil || *xxOnly != TTx {
296                 infosPayloads = ctx.infosOur(nodeId, nice)
297         }
298         var firstPayload []byte
299         if len(infosPayloads) > 0 {
300                 firstPayload = infosPayloads[0]
301         }
302         // Pad first payload, to hide actual existing files
303         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
304                 firstPayload = append(firstPayload, SPHaltMarshalized...)
305         }
306
307         var buf []byte
308         var payload []byte
309         buf, _, _ = state.hs.WriteMessage(nil, firstPayload)
310         sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))}
311         ctx.LogD("sp-start", sds, "sending first message")
312         conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second))
313         if err = state.WriteSP(conn, buf); err != nil {
314                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
315                 state.dirUnlock()
316                 return nil, err
317         }
318         ctx.LogD("sp-start", sds, "waiting for first message")
319         conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second))
320         if buf, err = state.ReadSP(conn); err != nil {
321                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
322                 state.dirUnlock()
323                 return nil, err
324         }
325         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
326         if err != nil {
327                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
328                 state.dirUnlock()
329                 return nil, err
330         }
331         ctx.LogD("sp-start", sds, "starting workers")
332         err = state.StartWorkers(conn, infosPayloads, payload)
333         if err != nil {
334                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
335                 state.dirUnlock()
336                 return nil, err
337         }
338         return &state, err
339 }
340
341 func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*SPState, error) {
342         started := time.Now()
343         conf := noise.Config{
344                 CipherSuite: NoiseCipherSuite,
345                 Pattern:     noise.HandshakeIK,
346                 Initiator:   false,
347                 StaticKeypair: noise.DHKey{
348                         Private: ctx.Self.NoisePrv[:],
349                         Public:  ctx.Self.NoisePub[:],
350                 },
351         }
352         state := SPState{
353                 ctx:        ctx,
354                 hs:         noise.NewHandshakeState(conf),
355                 nice:       nice,
356                 payloads:   make(chan []byte),
357                 infosTheir: make(map[[32]byte]*SPInfo),
358                 started:    started,
359                 xxOnly:     xxOnly,
360         }
361         var buf []byte
362         var payload []byte
363         var err error
364         ctx.LogD(
365                 "sp-start",
366                 SDS{"nice": strconv.Itoa(int(nice))},
367                 "waiting for first message",
368         )
369         conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second))
370         if buf, err = state.ReadSP(conn); err != nil {
371                 ctx.LogE("sp-start", SDS{"err": err}, "")
372                 return nil, err
373         }
374         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
375                 ctx.LogE("sp-start", SDS{"err": err}, "")
376                 return nil, err
377         }
378
379         var nodeId *NodeId
380         for _, node := range ctx.Neigh {
381                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), node.NoisePub[:]) == 1 {
382                         nodeId = node.Id
383                         break
384                 }
385         }
386         if nodeId == nil {
387                 peerId := ToBase32(state.hs.PeerStatic())
388                 ctx.LogE("sp-start", SDS{"peer": peerId}, "unknown")
389                 return nil, errors.New("Unknown peer: " + peerId)
390         }
391         state.NodeId = nodeId
392         sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))}
393
394         if ctx.ensureRxDir(nodeId); err != nil {
395                 return nil, err
396         }
397         var rxLock *os.File
398         if xxOnly != nil && *xxOnly == TRx {
399                 rxLock, err = ctx.LockDir(nodeId, TRx)
400                 if err != nil {
401                         return nil, err
402                 }
403         }
404         state.rxLock = rxLock
405         var txLock *os.File
406         if xxOnly != nil && *xxOnly == TTx {
407                 txLock, err = ctx.LockDir(nodeId, TTx)
408                 if err != nil {
409                         return nil, err
410                 }
411         }
412         state.txLock = txLock
413
414         var infosPayloads [][]byte
415         if xxOnly == nil || *xxOnly != TTx {
416                 infosPayloads = ctx.infosOur(nodeId, nice)
417         }
418         var firstPayload []byte
419         if len(infosPayloads) > 0 {
420                 firstPayload = infosPayloads[0]
421         }
422         // Pad first payload, to hide actual existing files
423         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
424                 firstPayload = append(firstPayload, SPHaltMarshalized...)
425         }
426
427         ctx.LogD("sp-start", sds, "sending first message")
428         buf, state.csTheir, state.csOur = state.hs.WriteMessage(nil, firstPayload)
429         conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second))
430         if err = state.WriteSP(conn, buf); err != nil {
431                 ctx.LogE("sp-start", SdsAdd(sds, SDS{"err": err}), "")
432                 state.dirUnlock()
433                 return nil, err
434         }
435         ctx.LogD("sp-start", sds, "starting workers")
436         err = state.StartWorkers(conn, infosPayloads, payload)
437         if err != nil {
438                 state.dirUnlock()
439                 return nil, err
440         }
441         return &state, err
442 }
443
444 func (state *SPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payload []byte) error {
445         sds := SDS{"node": state.NodeId, "nice": strconv.Itoa(int(state.nice))}
446         if len(infosPayloads) > 1 {
447                 go func() {
448                         for _, payload := range infosPayloads[1:] {
449                                 state.ctx.LogD(
450                                         "sp-work",
451                                         SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
452                                         "queuing remaining payload",
453                                 )
454                                 state.payloads <- payload
455                         }
456                 }()
457         }
458         state.ctx.LogD(
459                 "sp-work",
460                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
461                 "processing first payload",
462         )
463         replies, err := state.ProcessSP(payload)
464         if err != nil {
465                 state.ctx.LogE("sp-work", SdsAdd(sds, SDS{"err": err}), "")
466                 return err
467         }
468         go func() {
469                 for _, reply := range replies {
470                         state.ctx.LogD(
471                                 "sp-work",
472                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}),
473                                 "queuing reply",
474                         )
475                         state.payloads <- reply
476                 }
477         }()
478         state.wg.Add(1)
479         go func() {
480                 defer state.wg.Done()
481                 for {
482                         if state.isDead() {
483                                 return
484                         }
485                         var payload []byte
486                         select {
487                         case payload = <-state.payloads:
488                                 state.ctx.LogD(
489                                         "sp-xmit",
490                                         SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
491                                         "got payload",
492                                 )
493                         default:
494                         }
495                         if payload == nil {
496                                 state.RLock()
497                                 if len(state.queueTheir) == 0 {
498                                         state.ctx.LogD("sp-xmit", sds, "file queue is empty")
499                                         state.RUnlock()
500                                         time.Sleep(100 * time.Millisecond)
501                                         continue
502                                 }
503                                 freq := state.queueTheir[0]
504                                 state.RUnlock()
505                                 sdsp := SdsAdd(sds, SDS{
506                                         "xx":   string(TTx),
507                                         "hash": ToBase32(freq.Hash[:]),
508                                         "size": strconv.FormatInt(int64(freq.Offset), 10),
509                                 })
510                                 state.ctx.LogD("sp-file", sdsp, "queueing")
511                                 fd, err := os.Open(filepath.Join(
512                                         state.ctx.Spool,
513                                         state.NodeId.String(),
514                                         string(TTx),
515                                         ToBase32(freq.Hash[:]),
516                                 ))
517                                 if err != nil {
518                                         state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
519                                         break
520                                 }
521                                 fi, err := fd.Stat()
522                                 if err != nil {
523                                         state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
524                                         break
525                                 }
526                                 fullSize := uint64(fi.Size())
527                                 var buf []byte
528                                 if freq.Offset < fullSize {
529                                         state.ctx.LogD("sp-file", sdsp, "seeking")
530                                         if _, err = fd.Seek(int64(freq.Offset), 0); err != nil {
531                                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
532                                                 break
533                                         }
534                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
535                                         n, err := fd.Read(buf)
536                                         if err != nil {
537                                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
538                                                 break
539                                         }
540                                         buf = buf[:n]
541                                         state.ctx.LogD(
542                                                 "sp-file",
543                                                 SdsAdd(sdsp, SDS{"size": strconv.Itoa(n)}),
544                                                 "read",
545                                         )
546                                 }
547                                 fd.Close()
548                                 payload = MarshalSP(SPTypeFile, SPFile{
549                                         Hash:    freq.Hash,
550                                         Offset:  freq.Offset,
551                                         Payload: buf,
552                                 })
553                                 ourSize := freq.Offset + uint64(len(buf))
554                                 sdsp["size"] = strconv.FormatInt(int64(ourSize), 10)
555                                 sdsp["fullsize"] = strconv.FormatInt(int64(fullSize), 10)
556                                 state.ctx.LogP("sp-file", sdsp, "")
557                                 state.Lock()
558                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].Hash == *freq.Hash {
559                                         if ourSize == fullSize {
560                                                 state.ctx.LogD("sp-file", sdsp, "finished")
561                                                 if len(state.queueTheir) > 1 {
562                                                         state.queueTheir = state.queueTheir[1:]
563                                                 } else {
564                                                         state.queueTheir = state.queueTheir[:0]
565                                                 }
566                                         } else {
567                                                 state.queueTheir[0].Offset += uint64(len(buf))
568                                         }
569                                 } else {
570                                         state.ctx.LogD("sp-file", sdsp, "queue disappeared")
571                                 }
572                                 state.Unlock()
573                         }
574                         state.ctx.LogD(
575                                 "sp-xmit",
576                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
577                                 "sending",
578                         )
579                         conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second))
580                         if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil {
581                                 state.ctx.LogE("sp-xmit", SdsAdd(sds, SDS{"err": err}), "")
582                                 break
583                         }
584                 }
585         }()
586         state.wg.Add(1)
587         go func() {
588                 defer state.wg.Done()
589                 for {
590                         if state.isDead() {
591                                 return
592                         }
593                         state.ctx.LogD("sp-recv", sds, "waiting for payload")
594                         conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second))
595                         payload, err := state.ReadSP(conn)
596                         if err != nil {
597                                 unmarshalErr := err.(*xdr.UnmarshalError)
598                                 netErr, ok := unmarshalErr.Err.(net.Error)
599                                 if (ok && netErr.Timeout()) || unmarshalErr.ErrorCode == xdr.ErrIO {
600                                         continue
601                                 } else {
602                                         state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
603                                         break
604                                 }
605                         }
606                         state.ctx.LogD(
607                                 "sp-recv",
608                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
609                                 "got payload",
610                         )
611                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
612                         if err != nil {
613                                 state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
614                                 break
615                         }
616                         state.ctx.LogD(
617                                 "sp-recv",
618                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
619                                 "processing",
620                         )
621                         replies, err := state.ProcessSP(payload)
622                         if err != nil {
623                                 state.ctx.LogE("sp-recv", SdsAdd(sds, SDS{"err": err}), "")
624                                 break
625                         }
626                         go func() {
627                                 for _, reply := range replies {
628                                         state.ctx.LogD(
629                                                 "sp-recv",
630                                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}),
631                                                 "queuing reply",
632                                         )
633                                         state.payloads <- reply
634                                 }
635                         }()
636                 }
637         }()
638         return nil
639 }
640
641 func (state *SPState) Wait() {
642         state.wg.Wait()
643         state.dirUnlock()
644         state.Duration = time.Now().Sub(state.started)
645         state.RxSpeed = state.RxBytes
646         state.TxSpeed = state.TxBytes
647         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
648         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
649         if rxDuration > 0 {
650                 state.RxSpeed = state.RxBytes / rxDuration
651         }
652         if txDuration > 0 {
653                 state.TxSpeed = state.TxBytes / txDuration
654         }
655 }
656
657 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
658         sds := SDS{"node": state.NodeId, "nice": strconv.Itoa(int(state.nice))}
659         r := bytes.NewReader(payload)
660         var err error
661         var replies [][]byte
662         var infosGot bool
663         for r.Len() > 0 {
664                 state.ctx.LogD("sp-process", sds, "unmarshaling header")
665                 var head SPHead
666                 if _, err = xdr.Unmarshal(r, &head); err != nil {
667                         state.ctx.LogE("sp-process", SdsAdd(sds, SDS{"err": err}), "")
668                         return nil, err
669                 }
670                 switch head.Type {
671                 case SPTypeInfo:
672                         infosGot = true
673                         sdsp := SdsAdd(sds, SDS{"type": "info"})
674                         state.ctx.LogD("sp-process", sdsp, "unmarshaling packet")
675                         var info SPInfo
676                         if _, err = xdr.Unmarshal(r, &info); err != nil {
677                                 state.ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "")
678                                 return nil, err
679                         }
680                         sdsp = SdsAdd(sds, SDS{
681                                 "hash": ToBase32(info.Hash[:]),
682                                 "size": strconv.FormatInt(int64(info.Size), 10),
683                         })
684                         if info.Nice > state.nice {
685                                 state.ctx.LogD("sp-process", sdsp, "too nice")
686                                 continue
687                         }
688                         state.ctx.LogD("sp-process", sdsp, "received")
689                         if state.xxOnly != nil && *state.xxOnly == TTx {
690                                 continue
691                         }
692                         state.Lock()
693                         state.infosTheir[*info.Hash] = &info
694                         state.Unlock()
695                         state.ctx.LogD("sp-process", sdsp, "stating part")
696                         if _, err = os.Stat(filepath.Join(
697                                 state.ctx.Spool,
698                                 state.NodeId.String(),
699                                 string(TRx),
700                                 ToBase32(info.Hash[:]),
701                         )); err == nil {
702                                 state.ctx.LogD("sp-process", sdsp, "already done")
703                                 replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
704                                 continue
705                         }
706                         fi, err := os.Stat(filepath.Join(
707                                 state.ctx.Spool,
708                                 state.NodeId.String(),
709                                 string(TRx),
710                                 ToBase32(info.Hash[:])+PartSuffix,
711                         ))
712                         var offset int64
713                         if err == nil {
714                                 offset = fi.Size()
715                                 state.ctx.LogD(
716                                         "sp-process",
717                                         SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(offset, 10)}),
718                                         "part exists",
719                                 )
720                         }
721                         replies = append(replies, MarshalSP(
722                                 SPTypeFreq,
723                                 SPFreq{info.Hash, uint64(offset)},
724                         ))
725                 case SPTypeFile:
726                         state.ctx.LogD(
727                                 "sp-process",
728                                 SdsAdd(sds, SDS{"type": "file"}),
729                                 "unmarshaling packet",
730                         )
731                         var file SPFile
732                         if _, err = xdr.Unmarshal(r, &file); err != nil {
733                                 state.ctx.LogE("sp-process", SdsAdd(sds, SDS{
734                                         "err":  err,
735                                         "type": "file",
736                                 }), "")
737                                 return nil, err
738                         }
739                         sdsp := SdsAdd(sds, SDS{
740                                 "xx":   string(TRx),
741                                 "hash": ToBase32(file.Hash[:]),
742                                 "size": strconv.Itoa(len(file.Payload)),
743                         })
744                         filePath := filepath.Join(
745                                 state.ctx.Spool,
746                                 state.NodeId.String(),
747                                 string(TRx),
748                                 ToBase32(file.Hash[:]),
749                         )
750                         state.ctx.LogD("sp-file", sdsp, "opening part")
751                         fd, err := os.OpenFile(
752                                 filePath+PartSuffix,
753                                 os.O_RDWR|os.O_CREATE,
754                                 os.FileMode(0600),
755                         )
756                         if err != nil {
757                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
758                                 return nil, err
759                         }
760                         state.ctx.LogD(
761                                 "sp-file",
762                                 SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(int64(file.Offset), 10)}),
763                                 "seeking",
764                         )
765                         if _, err = fd.Seek(int64(file.Offset), 0); err != nil {
766                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
767                                 fd.Close()
768                                 return nil, err
769                         }
770                         state.ctx.LogD("sp-file", sdsp, "writing")
771                         _, err = fd.Write(file.Payload)
772                         if err != nil {
773                                 state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "")
774                                 fd.Close()
775                                 return nil, err
776                         }
777                         ourSize := uint64(file.Offset) + uint64(len(file.Payload))
778                         sdsp["fullsize"] = strconv.FormatInt(int64(state.infosTheir[*file.Hash].Size), 10)
779                         sdsp["size"] = strconv.FormatInt(int64(ourSize), 10)
780                         state.ctx.LogP("sp-file", sdsp, "")
781                         if state.infosTheir[*file.Hash].Size != ourSize {
782                                 fd.Close()
783                                 continue
784                         }
785                         go func() {
786                                 if err := fd.Sync(); err != nil {
787                                         state.ctx.LogE("sp-file", SdsAdd(sdsp, SDS{"err": err}), "sync")
788                                         fd.Close()
789                                         return
790                                 }
791                                 state.wg.Add(1)
792                                 defer state.wg.Done()
793                                 fd.Seek(0, 0)
794                                 state.ctx.LogD("sp-file", sdsp, "checking")
795                                 gut, err := Check(fd, file.Hash[:])
796                                 fd.Close()
797                                 if err != nil || !gut {
798                                         state.ctx.LogE("sp-file", sdsp, "checksum mismatch")
799                                         return
800                                 }
801                                 state.ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "")
802                                 os.Rename(filePath+PartSuffix, filePath)
803                                 go func() {
804                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
805                                 }()
806                         }()
807                 case SPTypeDone:
808                         state.ctx.LogD(
809                                 "sp-process",
810                                 SdsAdd(sds, SDS{"type": "done"}),
811                                 "unmarshaling packet",
812                         )
813                         var done SPDone
814                         if _, err = xdr.Unmarshal(r, &done); err != nil {
815                                 state.ctx.LogE("sp-process", SdsAdd(sds, SDS{
816                                         "type": "done",
817                                         "err":  err,
818                                 }), "")
819                                 return nil, err
820                         }
821                         sdsp := SdsAdd(sds, SDS{"hash": ToBase32(done.Hash[:])})
822                         state.ctx.LogD("sp-done", sdsp, "removing")
823                         err := os.Remove(filepath.Join(
824                                 state.ctx.Spool,
825                                 state.NodeId.String(),
826                                 string(TTx),
827                                 ToBase32(done.Hash[:]),
828                         ))
829                         if err == nil {
830                                 state.ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TTx)}), "")
831                         } else {
832                                 state.ctx.LogE("sp-done", SdsAdd(sdsp, SDS{"xx": string(TTx)}), "")
833                         }
834                 case SPTypeFreq:
835                         sdsp := SdsAdd(sds, SDS{"type": "freq"})
836                         state.ctx.LogD("sp-process", sdsp, "unmarshaling packet")
837                         var freq SPFreq
838                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
839                                 state.ctx.LogE("sp-process", SdsAdd(sdsp, SDS{"err": err}), "")
840                                 return nil, err
841                         }
842                         state.ctx.LogD("sp-process", SdsAdd(sdsp, SDS{
843                                 "hash":   ToBase32(freq.Hash[:]),
844                                 "offset": strconv.FormatInt(int64(freq.Offset), 10),
845                         }), "queueing")
846                         state.Lock()
847                         state.queueTheir = append(state.queueTheir, &freq)
848                         state.Unlock()
849                 case SPTypeHalt:
850                         sdsp := SdsAdd(sds, SDS{"type": "halt"})
851                         state.ctx.LogD("sp-process", sdsp, "")
852                         state.Lock()
853                         state.queueTheir = nil
854                         state.Unlock()
855                 default:
856                         state.ctx.LogE(
857                                 "sp-process",
858                                 SdsAdd(sds, SDS{"type": head.Type}),
859                                 "unknown",
860                         )
861                         return nil, BadPktType
862                 }
863         }
864         if infosGot {
865                 var pkts int
866                 var size uint64
867                 for _, info := range state.infosTheir {
868                         pkts++
869                         size += info.Size
870                 }
871                 state.ctx.LogI("sp-infos", SDS{
872                         "xx":   string(TRx),
873                         "node": state.NodeId,
874                         "pkts": strconv.Itoa(pkts),
875                         "size": strconv.FormatInt(int64(size), 10),
876                 }, "")
877         }
878         return payloadsSplit(replies), nil
879 }