public class EPollSelectorProvider
extends SelectorProviderImpl
{
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}
}
// register one end of the socket pair for wakeups
EPoll.ctl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
}
}
static native int create() throws IOException; // epollCreate方法,这是个native方法。
在Epoll.c中可以看到:
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPoll_create(JNIEnv *env, jclass clazz) {
/* size hint not used in modern kernels */
int epfd = epoll_create(256);
if (epfd < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
}
return epfd;
}
可以看到最后还是使用了操作系统的api: epoll_create函数
Epoll wait等待内核IO事件
调用Selector.select(),
@Override
public final int select(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
}
private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
{
synchronized (this) {
ensureOpen();
if (inSelect)
throw new IllegalStateException("select in progress");
inSelect = true;
try {
synchronized (publicSelectedKeys) {
return doSelect(action, timeout);
}
} finally {
inSelect = false;
}
}
}
protected int doSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
{
assert Thread.holdsLock(this);
// epoll_wait timeout is int
int to = (int) Math.min(timeout, Integer.MAX_VALUE);
boolean blocking = (to != 0);
boolean timedPoll = (to > 0);
int numEntries;
processUpdateQueue();
processDeregisterQueue();
try {
begin(blocking);
do {
long startTime = timedPoll ? System.nanoTime() : 0;
//
numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to);
if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
// timed poll interrupted so need to adjust timeout
long adjust = System.nanoTime() - startTime;
to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
if (to <= 0) {
// timeout expired so no retry
numEntries = 0;
}
}
} while (numEntries == IOStatus.INTERRUPTED);
assert IOStatus.check(numEntries);
// events for file descriptors with registration changes pending, indexed
// by file descriptor and stored as bytes for efficiency reasons. For
// file descriptors higher than MAX_UPDATE_ARRAY_SIZE (unlimited case at
// least) then the update is stored in a map.
// 使用数组保存事件变更, 数组的最大长度是MAX_UPDATE_ARRAY_SIZE, 最大64*1024
private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];
// 超过数组长度的事件会缓存到这个map中,等待下次处理
private Map<Integer,Byte> eventsHigh;
/**
* Sets the pending update events for the given file descriptor. This
* method has no effect if the update events is already set to KILLED,
* unless {@code force} is {@code true}.
*/
private void setUpdateEvents(int fd, byte events, boolean force) {
// 判断fd和数组长度
if (fd < MAX_UPDATE_ARRAY_SIZE) {
if ((eventsLow[fd] != KILLED) || force) {
eventsLow[fd] = events;
}
} else {
Integer key = Integer.valueOf(fd);
if (!isEventsHighKilled(key) || force) {
eventsHigh.put(key, Byte.valueOf(events));
}
}
}
/**
* Returns the pending update events for the given file descriptor.
*/
private byte getUpdateEvents(int fd) {
if (fd < MAX_UPDATE_ARRAY_SIZE) {
return eventsLow[fd];
} else {
Byte result = eventsHigh.get(Integer.valueOf(fd));
// result should never be null
return result.byteValue();
}
}
/**
* Update the pending registrations.
*/
private void updateRegistrations() {
synchronized (updateLock) {
int j = 0;
while (j < updateCount) {
int fd = updateDescriptors[j];
// 从保存的eventsLow和eventsHigh里取出事件
short events = getUpdateEvents(fd);
boolean isRegistered = registered.get(fd);
int opcode = 0;
if (events != KILLED) {
// 判断操作类型以传给epoll_ctl
// 没有指定EPOLLET事件类型
if (isRegistered) {
opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
} else {
opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
}
if (opcode != 0) {
// 熟悉的epoll_ctl
epollCtl(epfd, opcode, fd, events);
if (opcode == EPOLL_CTL_ADD) {
registered.set(fd);
} else if (opcode == EPOLL_CTL_DEL) {
registered.clear(fd);
}
}
}
j++;
}
updateCount = 0;
}
}
private native void epollCtl(int epfd, int opcode, int fd, int events);
/*
* A channel may be registered with several Selectors. When each Selector
* is polled a EPOLL_CTL_DEL op will be inserted into its pending update
* list to remove the file descriptor from epoll. The "last" Selector will
* close the file descriptor which automatically unregisters it from each
* epoll descriptor. To avoid costly synchronization between Selectors we
* allow pending updates to be processed, ignoring errors. The errors are
* harmless as the last update for the file descriptor is guaranteed to
* be EPOLL_CTL_DEL.
*/
if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");
}
}
//边缘触发
while(true) {
ret = read(fd, buf, sizeof(buf);
if (ret == EAGAIN) break;
}
在AbstractSelectorImpl中有3个set保存事件
// Public views of the key sets
// 注册的所有事件
private Set<SelectionKey> publicKeys; // Immutable
// 内核返回的IO事件封装,表示哪些fd有数据可读可写
private Set<SelectionKey> publicSelectedKeys; // Removal allowed, but not addition
// 取消的事件
private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>();
private int updateSelectedKeys() {
int entries = pollWrapper.updated;
int numKeysUpdated = 0;
for (int i=0; i<entries; i++) {
int nextFD = pollWrapper.getDescriptor(i);
SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
// ski is null in the case of an interrupt
if (ski != null) {
int rOps = pollWrapper.getEventOps(i);
if (selectedKeys.contains(ski)) {
if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
numKeysUpdated++;
}
} else {
ski.channel.translateAndSetReadyOps(rOps, ski);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
selectedKeys.add(ski);
numKeysUpdated++;
}
}
}
}
return numKeysUpdated;
}