]> Cypherpunks.ru repositories - gostls13.git/blob - src/net/pipe.go
cmd/compile/internal/inline: score call sites exposed by inlines
[gostls13.git] / src / net / pipe.go
1 // Copyright 2010 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4
5 package net
6
7 import (
8         "io"
9         "os"
10         "sync"
11         "time"
12 )
13
14 // pipeDeadline is an abstraction for handling timeouts.
15 type pipeDeadline struct {
16         mu     sync.Mutex // Guards timer and cancel
17         timer  *time.Timer
18         cancel chan struct{} // Must be non-nil
19 }
20
21 func makePipeDeadline() pipeDeadline {
22         return pipeDeadline{cancel: make(chan struct{})}
23 }
24
25 // set sets the point in time when the deadline will time out.
26 // A timeout event is signaled by closing the channel returned by waiter.
27 // Once a timeout has occurred, the deadline can be refreshed by specifying a
28 // t value in the future.
29 //
30 // A zero value for t prevents timeout.
31 func (d *pipeDeadline) set(t time.Time) {
32         d.mu.Lock()
33         defer d.mu.Unlock()
34
35         if d.timer != nil && !d.timer.Stop() {
36                 <-d.cancel // Wait for the timer callback to finish and close cancel
37         }
38         d.timer = nil
39
40         // Time is zero, then there is no deadline.
41         closed := isClosedChan(d.cancel)
42         if t.IsZero() {
43                 if closed {
44                         d.cancel = make(chan struct{})
45                 }
46                 return
47         }
48
49         // Time in the future, setup a timer to cancel in the future.
50         if dur := time.Until(t); dur > 0 {
51                 if closed {
52                         d.cancel = make(chan struct{})
53                 }
54                 d.timer = time.AfterFunc(dur, func() {
55                         close(d.cancel)
56                 })
57                 return
58         }
59
60         // Time in the past, so close immediately.
61         if !closed {
62                 close(d.cancel)
63         }
64 }
65
66 // wait returns a channel that is closed when the deadline is exceeded.
67 func (d *pipeDeadline) wait() chan struct{} {
68         d.mu.Lock()
69         defer d.mu.Unlock()
70         return d.cancel
71 }
72
73 func isClosedChan(c <-chan struct{}) bool {
74         select {
75         case <-c:
76                 return true
77         default:
78                 return false
79         }
80 }
81
82 type pipeAddr struct{}
83
84 func (pipeAddr) Network() string { return "pipe" }
85 func (pipeAddr) String() string  { return "pipe" }
86
87 type pipe struct {
88         wrMu sync.Mutex // Serialize Write operations
89
90         // Used by local Read to interact with remote Write.
91         // Successful receive on rdRx is always followed by send on rdTx.
92         rdRx <-chan []byte
93         rdTx chan<- int
94
95         // Used by local Write to interact with remote Read.
96         // Successful send on wrTx is always followed by receive on wrRx.
97         wrTx chan<- []byte
98         wrRx <-chan int
99
100         once       sync.Once // Protects closing localDone
101         localDone  chan struct{}
102         remoteDone <-chan struct{}
103
104         readDeadline  pipeDeadline
105         writeDeadline pipeDeadline
106 }
107
108 // Pipe creates a synchronous, in-memory, full duplex
109 // network connection; both ends implement the Conn interface.
110 // Reads on one end are matched with writes on the other,
111 // copying data directly between the two; there is no internal
112 // buffering.
113 func Pipe() (Conn, Conn) {
114         cb1 := make(chan []byte)
115         cb2 := make(chan []byte)
116         cn1 := make(chan int)
117         cn2 := make(chan int)
118         done1 := make(chan struct{})
119         done2 := make(chan struct{})
120
121         p1 := &pipe{
122                 rdRx: cb1, rdTx: cn1,
123                 wrTx: cb2, wrRx: cn2,
124                 localDone: done1, remoteDone: done2,
125                 readDeadline:  makePipeDeadline(),
126                 writeDeadline: makePipeDeadline(),
127         }
128         p2 := &pipe{
129                 rdRx: cb2, rdTx: cn2,
130                 wrTx: cb1, wrRx: cn1,
131                 localDone: done2, remoteDone: done1,
132                 readDeadline:  makePipeDeadline(),
133                 writeDeadline: makePipeDeadline(),
134         }
135         return p1, p2
136 }
137
138 func (*pipe) LocalAddr() Addr  { return pipeAddr{} }
139 func (*pipe) RemoteAddr() Addr { return pipeAddr{} }
140
141 func (p *pipe) Read(b []byte) (int, error) {
142         n, err := p.read(b)
143         if err != nil && err != io.EOF && err != io.ErrClosedPipe {
144                 err = &OpError{Op: "read", Net: "pipe", Err: err}
145         }
146         return n, err
147 }
148
149 func (p *pipe) read(b []byte) (n int, err error) {
150         switch {
151         case isClosedChan(p.localDone):
152                 return 0, io.ErrClosedPipe
153         case isClosedChan(p.remoteDone):
154                 return 0, io.EOF
155         case isClosedChan(p.readDeadline.wait()):
156                 return 0, os.ErrDeadlineExceeded
157         }
158
159         select {
160         case bw := <-p.rdRx:
161                 nr := copy(b, bw)
162                 p.rdTx <- nr
163                 return nr, nil
164         case <-p.localDone:
165                 return 0, io.ErrClosedPipe
166         case <-p.remoteDone:
167                 return 0, io.EOF
168         case <-p.readDeadline.wait():
169                 return 0, os.ErrDeadlineExceeded
170         }
171 }
172
173 func (p *pipe) Write(b []byte) (int, error) {
174         n, err := p.write(b)
175         if err != nil && err != io.ErrClosedPipe {
176                 err = &OpError{Op: "write", Net: "pipe", Err: err}
177         }
178         return n, err
179 }
180
181 func (p *pipe) write(b []byte) (n int, err error) {
182         switch {
183         case isClosedChan(p.localDone):
184                 return 0, io.ErrClosedPipe
185         case isClosedChan(p.remoteDone):
186                 return 0, io.ErrClosedPipe
187         case isClosedChan(p.writeDeadline.wait()):
188                 return 0, os.ErrDeadlineExceeded
189         }
190
191         p.wrMu.Lock() // Ensure entirety of b is written together
192         defer p.wrMu.Unlock()
193         for once := true; once || len(b) > 0; once = false {
194                 select {
195                 case p.wrTx <- b:
196                         nw := <-p.wrRx
197                         b = b[nw:]
198                         n += nw
199                 case <-p.localDone:
200                         return n, io.ErrClosedPipe
201                 case <-p.remoteDone:
202                         return n, io.ErrClosedPipe
203                 case <-p.writeDeadline.wait():
204                         return n, os.ErrDeadlineExceeded
205                 }
206         }
207         return n, nil
208 }
209
210 func (p *pipe) SetDeadline(t time.Time) error {
211         if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
212                 return io.ErrClosedPipe
213         }
214         p.readDeadline.set(t)
215         p.writeDeadline.set(t)
216         return nil
217 }
218
219 func (p *pipe) SetReadDeadline(t time.Time) error {
220         if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
221                 return io.ErrClosedPipe
222         }
223         p.readDeadline.set(t)
224         return nil
225 }
226
227 func (p *pipe) SetWriteDeadline(t time.Time) error {
228         if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
229                 return io.ErrClosedPipe
230         }
231         p.writeDeadline.set(t)
232         return nil
233 }
234
235 func (p *pipe) Close() error {
236         p.once.Do(func() { close(p.localDone) })
237         return nil
238 }