1 // Copyright 2010 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
14 // pipeDeadline is an abstraction for handling timeouts.
15 type pipeDeadline struct {
16 mu sync.Mutex // Guards timer and cancel
18 cancel chan struct{} // Must be non-nil
21 func makePipeDeadline() pipeDeadline {
22 return pipeDeadline{cancel: make(chan struct{})}
25 // set sets the point in time when the deadline will time out.
26 // A timeout event is signaled by closing the channel returned by waiter.
27 // Once a timeout has occurred, the deadline can be refreshed by specifying a
28 // t value in the future.
30 // A zero value for t prevents timeout.
31 func (d *pipeDeadline) set(t time.Time) {
35 if d.timer != nil && !d.timer.Stop() {
36 <-d.cancel // Wait for the timer callback to finish and close cancel
40 // Time is zero, then there is no deadline.
41 closed := isClosedChan(d.cancel)
44 d.cancel = make(chan struct{})
49 // Time in the future, setup a timer to cancel in the future.
50 if dur := time.Until(t); dur > 0 {
52 d.cancel = make(chan struct{})
54 d.timer = time.AfterFunc(dur, func() {
60 // Time in the past, so close immediately.
66 // wait returns a channel that is closed when the deadline is exceeded.
67 func (d *pipeDeadline) wait() chan struct{} {
73 func isClosedChan(c <-chan struct{}) bool {
82 type pipeAddr struct{}
84 func (pipeAddr) Network() string { return "pipe" }
85 func (pipeAddr) String() string { return "pipe" }
88 wrMu sync.Mutex // Serialize Write operations
90 // Used by local Read to interact with remote Write.
91 // Successful receive on rdRx is always followed by send on rdTx.
95 // Used by local Write to interact with remote Read.
96 // Successful send on wrTx is always followed by receive on wrRx.
100 once sync.Once // Protects closing localDone
101 localDone chan struct{}
102 remoteDone <-chan struct{}
104 readDeadline pipeDeadline
105 writeDeadline pipeDeadline
108 // Pipe creates a synchronous, in-memory, full duplex
109 // network connection; both ends implement the Conn interface.
110 // Reads on one end are matched with writes on the other,
111 // copying data directly between the two; there is no internal
113 func Pipe() (Conn, Conn) {
114 cb1 := make(chan []byte)
115 cb2 := make(chan []byte)
116 cn1 := make(chan int)
117 cn2 := make(chan int)
118 done1 := make(chan struct{})
119 done2 := make(chan struct{})
122 rdRx: cb1, rdTx: cn1,
123 wrTx: cb2, wrRx: cn2,
124 localDone: done1, remoteDone: done2,
125 readDeadline: makePipeDeadline(),
126 writeDeadline: makePipeDeadline(),
129 rdRx: cb2, rdTx: cn2,
130 wrTx: cb1, wrRx: cn1,
131 localDone: done2, remoteDone: done1,
132 readDeadline: makePipeDeadline(),
133 writeDeadline: makePipeDeadline(),
138 func (*pipe) LocalAddr() Addr { return pipeAddr{} }
139 func (*pipe) RemoteAddr() Addr { return pipeAddr{} }
141 func (p *pipe) Read(b []byte) (int, error) {
143 if err != nil && err != io.EOF && err != io.ErrClosedPipe {
144 err = &OpError{Op: "read", Net: "pipe", Err: err}
149 func (p *pipe) read(b []byte) (n int, err error) {
151 case isClosedChan(p.localDone):
152 return 0, io.ErrClosedPipe
153 case isClosedChan(p.remoteDone):
155 case isClosedChan(p.readDeadline.wait()):
156 return 0, os.ErrDeadlineExceeded
165 return 0, io.ErrClosedPipe
168 case <-p.readDeadline.wait():
169 return 0, os.ErrDeadlineExceeded
173 func (p *pipe) Write(b []byte) (int, error) {
175 if err != nil && err != io.ErrClosedPipe {
176 err = &OpError{Op: "write", Net: "pipe", Err: err}
181 func (p *pipe) write(b []byte) (n int, err error) {
183 case isClosedChan(p.localDone):
184 return 0, io.ErrClosedPipe
185 case isClosedChan(p.remoteDone):
186 return 0, io.ErrClosedPipe
187 case isClosedChan(p.writeDeadline.wait()):
188 return 0, os.ErrDeadlineExceeded
191 p.wrMu.Lock() // Ensure entirety of b is written together
192 defer p.wrMu.Unlock()
193 for once := true; once || len(b) > 0; once = false {
200 return n, io.ErrClosedPipe
202 return n, io.ErrClosedPipe
203 case <-p.writeDeadline.wait():
204 return n, os.ErrDeadlineExceeded
210 func (p *pipe) SetDeadline(t time.Time) error {
211 if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
212 return io.ErrClosedPipe
214 p.readDeadline.set(t)
215 p.writeDeadline.set(t)
219 func (p *pipe) SetReadDeadline(t time.Time) error {
220 if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
221 return io.ErrClosedPipe
223 p.readDeadline.set(t)
227 func (p *pipe) SetWriteDeadline(t time.Time) error {
228 if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
229 return io.ErrClosedPipe
231 p.writeDeadline.set(t)
235 func (p *pipe) Close() error {
236 p.once.Do(func() { close(p.localDone) })