package os_test
import (
+ "errors"
+ "internal/testenv"
+ "io/fs"
"os"
"path/filepath"
+ "strconv"
+ "sync"
"syscall"
"testing"
)
testPipeEOF(t, r, w)
}
+
+// Issue #59545.
+func TestNonPollable(t *testing.T) {
+ if testing.Short() {
+ t.Skip("skipping test with tight loops in short mode")
+ }
+
+ // We need to open a non-pollable file.
+ // This is almost certainly Linux-specific,
+ // but if other systems have non-pollable files,
+ // we can add them here.
+ const nonPollable = "/dev/net/tun"
+
+ f, err := os.OpenFile(nonPollable, os.O_RDWR, 0)
+ if err != nil {
+ if errors.Is(err, fs.ErrExist) || errors.Is(err, fs.ErrPermission) || testenv.SyscallIsNotSupported(err) {
+ t.Skipf("can't open %q: %v", nonPollable, err)
+ }
+ t.Fatal(err)
+ }
+ f.Close()
+
+ // On a Linux laptop, before the problem was fixed,
+ // this test failed about 50% of the time with this
+ // number of iterations.
+ // It takes about 1/2 second when it passes.
+ const attempts = 20000
+
+ start := make(chan bool)
+ var wg sync.WaitGroup
+ wg.Add(1)
+ defer wg.Wait()
+ go func() {
+ defer wg.Done()
+ close(start)
+ for i := 0; i < attempts; i++ {
+ f, err := os.OpenFile(nonPollable, os.O_RDWR, 0)
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ if err := f.Close(); err != nil {
+ t.Error(err)
+ return
+ }
+ }
+ }()
+
+ dir := t.TempDir()
+ <-start
+ for i := 0; i < attempts; i++ {
+ name := filepath.Join(dir, strconv.Itoa(i))
+ if err := syscall.Mkfifo(name, 0o600); err != nil {
+ t.Fatal(err)
+ }
+ // The problem only occurs if we use O_NONBLOCK here.
+ rd, err := os.OpenFile(name, os.O_RDONLY|syscall.O_NONBLOCK, 0o600)
+ if err != nil {
+ t.Fatal(err)
+ }
+ wr, err := os.OpenFile(name, os.O_WRONLY|syscall.O_NONBLOCK, 0o600)
+ if err != nil {
+ t.Fatal(err)
+ }
+ const msg = "message"
+ if _, err := wr.Write([]byte(msg)); err != nil {
+ if errors.Is(err, syscall.EAGAIN) || errors.Is(err, syscall.ENOBUFS) {
+ t.Logf("ignoring write error %v", err)
+ rd.Close()
+ wr.Close()
+ continue
+ }
+ t.Fatalf("write to fifo %d failed: %v", i, err)
+ }
+ if _, err := rd.Read(make([]byte, len(msg))); err != nil {
+ if errors.Is(err, syscall.EAGAIN) || errors.Is(err, syscall.ENOBUFS) {
+ t.Logf("ignoring read error %v", err)
+ rd.Close()
+ wr.Close()
+ continue
+ }
+ t.Fatalf("read from fifo %d failed; %v", i, err)
+ }
+ if err := rd.Close(); err != nil {
+ t.Fatal(err)
+ }
+ if err := wr.Close(); err != nil {
+ t.Fatal(err)
+ }
+ }
+}
//
// No heap pointers.
type pollDesc struct {
- _ sys.NotInHeap
- link *pollDesc // in pollcache, protected by pollcache.lock
- fd uintptr // constant for pollDesc usage lifetime
+ _ sys.NotInHeap
+ link *pollDesc // in pollcache, protected by pollcache.lock
+ fd uintptr // constant for pollDesc usage lifetime
+ fdseq atomic.Uintptr // protects against stale pollDesc
// atomicInfo holds bits from closing, rd, and wd,
// which are only ever written while holding the lock,
pollEventErr
pollExpiredReadDeadline
pollExpiredWriteDeadline
+ pollFDSeq // 20 bit field, low 20 bits of fdseq field
+)
+
+const (
+ pollFDSeqBits = 20 // number of bits in pollFDSeq
+ pollFDSeqMask = 1<<pollFDSeqBits - 1 // mask for pollFDSeq
)
func (i pollInfo) closing() bool { return i&pollClosing != 0 }
if pd.wd < 0 {
info |= pollExpiredWriteDeadline
}
+ info |= uint32(pd.fdseq.Load()&pollFDSeqMask) << pollFDSeq
// Set all of x except the pollEventErr bit.
x := pd.atomicInfo.Load()
}
// setEventErr sets the result of pd.info().eventErr() to b.
-func (pd *pollDesc) setEventErr(b bool) {
+// We only change the error bit if seq == 0 or if seq matches pollFDSeq
+// (issue #59545).
+func (pd *pollDesc) setEventErr(b bool, seq uintptr) {
+ mSeq := uint32(seq & pollFDSeqMask)
x := pd.atomicInfo.Load()
+ xSeq := (x >> pollFDSeq) & pollFDSeqMask
+ if seq != 0 && xSeq != mSeq {
+ return
+ }
for (x&pollEventErr != 0) != b && !pd.atomicInfo.CompareAndSwap(x, x^pollEventErr) {
x = pd.atomicInfo.Load()
+ xSeq := (x >> pollFDSeq) & pollFDSeqMask
+ if seq != 0 && xSeq != mSeq {
+ return
+ }
}
}
throw("runtime: blocked read on free polldesc")
}
pd.fd = fd
+ if pd.fdseq.Load() == 0 {
+ // The value 0 is special in setEventErr, so don't use it.
+ pd.fdseq.Store(1)
+ }
pd.closing = false
- pd.setEventErr(false)
+ pd.setEventErr(false, 0)
pd.rseq++
pd.rg.Store(pdNil)
pd.rd = 0
}
func (c *pollCache) free(pd *pollDesc) {
+ // Increment the fdseq field, so that any currently
+ // running netpoll calls will not mark pd as ready.
+ fdseq := pd.fdseq.Load()
+ fdseq = (fdseq + 1) & (1<<taggedPointerBits - 1)
+ pd.fdseq.Store(fdseq)
+
lock(&c.lock)
pd.link = c.first
c.first = pd
lock(&mtxset)
unlock(&mtxpoll)
+ // We don't worry about pd.fdseq here,
+ // as mtxset protects us from stale pollDescs.
+
pd.user = uint32(len(pfds))
pfds = append(pfds, pollfd{fd: int32(fd)})
pds = append(pds, pd)
pfd.events &= ^_POLLOUT
}
if mode != 0 {
- pds[i].setEventErr(pfd.revents == _POLLERR)
+ pds[i].setEventErr(pfd.revents == _POLLERR, 0)
netpollready(&toRun, pds[i], mode)
n--
}
func netpollopen(fd uintptr, pd *pollDesc) uintptr {
var ev syscall.EpollEvent
ev.Events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLET
- *(**pollDesc)(unsafe.Pointer(&ev.Data)) = pd
+ tp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load())
+ *(*taggedPointer)(unsafe.Pointer(&ev.Data)) = tp
return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int32(fd), &ev)
}
mode += 'w'
}
if mode != 0 {
- pd := *(**pollDesc)(unsafe.Pointer(&ev.Data))
- pd.setEventErr(ev.Events == syscall.EPOLLERR)
- netpollready(&toRun, pd, mode)
+ tp := *(*taggedPointer)(unsafe.Pointer(&ev.Data))
+ pd := (*pollDesc)(tp.pointer())
+ tag := tp.tag()
+ if pd.fdseq.Load() == tag {
+ pd.setEventErr(ev.Events == syscall.EPOLLERR, tag)
+ netpollready(&toRun, pd, mode)
+ }
}
}
return toRun
// Integrated network poller (kqueue-based implementation).
import (
+ "internal/goarch"
"runtime/internal/atomic"
"unsafe"
)
ev[0].flags = _EV_ADD | _EV_CLEAR
ev[0].fflags = 0
ev[0].data = 0
- ev[0].udata = (*byte)(unsafe.Pointer(pd))
+
+ if goarch.PtrSize == 4 {
+ // We only have a pointer-sized field to store into,
+ // so on a 32-bit system we get no sequence protection.
+ // TODO(iant): If we notice any problems we could at leaset
+ // steal the low-order 2 bits for a tiny sequence number.
+ ev[0].udata = (*byte)(unsafe.Pointer(pd))
+ } else {
+ tp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load())
+ ev[0].udata = (*byte)(unsafe.Pointer(uintptr(tp)))
+ }
ev[1] = ev[0]
ev[1].filter = _EVFILT_WRITE
n := kevent(kq, &ev[0], 2, nil, 0, nil)
mode += 'w'
}
if mode != 0 {
- pd := (*pollDesc)(unsafe.Pointer(ev.udata))
- pd.setEventErr(ev.flags == _EV_ERROR)
+ var pd *pollDesc
+ var tag uintptr
+ if goarch.PtrSize == 4 {
+ // No sequence protection on 32-bit systems.
+ // See netpollopen for details.
+ pd = (*pollDesc)(unsafe.Pointer(ev.udata))
+ tag = 0
+ } else {
+ tp := taggedPointer(uintptr(unsafe.Pointer(ev.udata)))
+ pd = (*pollDesc)(tp.pointer())
+ tag = tp.tag()
+ if pd.fdseq.Load() != tag {
+ continue
+ }
+ }
+ pd.setEventErr(ev.flags == _EV_ERROR, tag)
netpollready(&toRun, pd, mode)
}
}
package runtime
import (
+ "internal/goarch"
"runtime/internal/atomic"
"unsafe"
)
// with the interested event set) will unblock port_getn right away
// because of the I/O readiness notification.
pd.user = 0
- r := port_associate(portfd, _PORT_SOURCE_FD, fd, 0, uintptr(unsafe.Pointer(pd)))
+ tp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load())
+ // Note that this won't work on a 32-bit system,
+ // as taggedPointer is always 64-bits but uintptr will be 32 bits.
+ // Fortunately we only support Solaris on amd64.
+ if goarch.PtrSize != 8 {
+ throw("runtime: netpollopen: unsupported pointer size")
+ }
+ r := port_associate(portfd, _PORT_SOURCE_FD, fd, 0, uintptr(tp))
unlock(&pd.lock)
return r
}
return
}
- if events != 0 && port_associate(portfd, _PORT_SOURCE_FD, pd.fd, events, uintptr(unsafe.Pointer(pd))) != 0 {
+ tp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load())
+ if events != 0 && port_associate(portfd, _PORT_SOURCE_FD, pd.fd, events, uintptr(tp)) != 0 {
print("runtime: port_associate failed (errno=", errno(), ")\n")
throw("runtime: netpollupdate failed")
}
if ev.portev_events == 0 {
continue
}
- pd := (*pollDesc)(unsafe.Pointer(ev.portev_user))
+
+ tp := taggedPointer(uintptr(unsafe.Pointer(ev.portev_user)))
+ pd := (*pollDesc)(tp.pointer())
+ if pd.fdseq.Load() != tp.tag() {
+ continue
+ }
var mode, clear int32
if (ev.portev_events & (_POLLIN | _POLLHUP | _POLLERR)) != 0 {
}
func netpollopen(fd uintptr, pd *pollDesc) int32 {
+ // TODO(iant): Consider using taggedPointer on 64-bit systems.
if stdcall4(_CreateIoCompletionPort, fd, iocphandle, uintptr(unsafe.Pointer(pd)), 0) == 0 {
return int32(getlasterror())
}