Writing an Event Loop in Go
Event loops have existed for a long time, from Nginx to Node.js to Redis event loops are present everywhere and for good reason, event loops provide an efficient way to implement high-performance systems.
In this post, we will be implementing a simple single-threaded TCP echo server that uses an event loop to handle multiple connections.
I am assuming you are familiar with what an event loop is(you should be if you’re reading this post :P) and have some basic understanding of network programming.
Async I/O
The gist of async I/O is that it does I/O in a non-blocking way, the server won’t block while waiting for data on a socket. The kernel provides us with a bunch of primitives to achieve async I/O.
kqueue
kqueue
is an event notification API provided by the kernel and is supported in macOS and other BSDs. Linux and windows have similar APIs namely epoll
and IOCPs
respectively, they have different API than that of kqueue
but achieve a similar result.
Before kqueue
and epoll
we had poll
and select
but they are not as efficient as the new APIs.
The kqueue
API
We have 2 syscalls to interact with the kqueue API.
kqueue()
creates a new queue and returns a file descriptor.kevent()
has two uses, it can be used to register new events and to poll for new events.
An event has the following structure:
struct kevent {
uintptr_t ident; /* identifier for this event */
short filter; /* filter for event */
u_short flags; /* action flags for kqueue */
u_int fflags; /* filter flag value */
int64_t data; /* filter data value */
void *udata; /* opaque user data identifier */
uint64_t ext[4]; /* extensions */
};
We use the above structure to create a new event and then register that event using the kevent()
syscall.
Let’s go over different fields of the event structure:
ident
- Holds the file descriptor which we want to monitor.filter
- Used to describe the event we want to listen to, eg:- read, write, etc.flags
- Used to describe what we want to do with the event, eg:- add to queue or remove from the queue, etc.fflags
- Filter specific flags.udata
- Arbitrary used defined data.ext
- Extended data passed to and from the kernel.
If you want detailed documentation on kqueue, the official man page is a great resource.
Writing the Event Loop
Note: Because I am using a mac, I will be using kqueue to implement the event loop and as a result, the code will not work on Linux or Windows systems.
Full code can be found here.
type Socket struct {
fd int
}
type Server struct {
socket *Socket
}
We have a Socket
type that holds the file descriptor to the socket and we have a Server
type that holds the socket on which the server will listen for new connections.
func NewServer(host string, port int) (*Server, error) {
fd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, syscall.IPPROTO_TCP)
if err != nil {
return nil, err
}
socket := &Socket{fd: fd}
addr := syscall.SockaddrInet4{
Port: port,
}
copy(addr.Addr[:], net.ParseIP(host))
syscall.Bind(socket.fd, &addr)
err = syscall.Listen(socket.fd, syscall.SOMAXCONN)
if err != nil {
return nil, err
}
return &Server{socket: socket}, nil
}
Here we are doing a few things, first, we are creating a new socket, syscall.AF_INET
means that we are using ipv4, syscall.SOCK_STREAM
means that we want a connection-oriented socket, syscall.IPROTO_TCP
marks that the socket will be a TCP socket.
syscall.Socket
returns a file descriptor to the new socket, we then bind the socket to the host and port passed by the client.
syscall.Listen
might look like we are listening on the newly created socket but instead it marks the socket as a passive socket, meaning that this socket will be used for accepting new connections.
We also have a few helper methods for the Server
and Socket
types which are pretty self-explanatory.
// Close closes the server socket
func (s *Server) Close() error {
return syscall.Close(s.socket.fd)
}
func (s *Socket) Fd() int {
return s.fd
}
The EventLoop
Type
type EventLoop struct {
kqueueFd int
sockFd int
}
The EventLoop
type holds the file descriptors for the kqueue and the server socket.
func NewEventLoop(sockFd int) (*EventLoop, error) {
kqueueFd, err := syscall.Kqueue()
if err != nil {
return nil, err
}
loop := &EventLoop{kqueueFd: kqueueFd, sockFd: sockFd}
socketEvent := syscall.Kevent_t{
Ident: uint64(loop.sockFd),
Filter: syscall.EVFILT_READ,
Flags: syscall.EV_ADD | syscall.EV_ENABLE,
}
r, err := syscall.Kevent(loop.kqueueFd, []syscall.Kevent_t{socketEvent}, nil, nil)
if err != nil {
return nil, err
}
if r == -1 {
return nil, errors.New("failed to register socket with kqueue")
}
return loop, nil
}
There are a lot of things happening here, let’s go step by step.
- First we are creating a new kqueue using
syscall.Kqueue
which will return the file descriptor this new kqueue. - Then we are instantiating a new kqueue event using the
syscall.Kevent_t
struct. - Then we are registering the above event with the kqueue using
syscall.Kevent
.
The Important thing to note is that syscall.Kevent
is used to both register events and poll for events as we will see in a moment.
syscall.Kevent
has the following signature:
syscall.Kevent(kqueueFileDescriptor, listOfEventsToRegister, listOfReadyEvents, timeout)
func (e *EventLoop) Start() {
for {
events := make([]syscall.Kevent_t, 1)
numEvents, err := syscall.Kevent(e.kqueueFd, nil, events, nil)
if err != nil {
continue
}
for i := 0; i < numEvents; i++ {
event := events[i]
eventFd := int(event.Ident)
if event.Flags&syscall.EV_EOF != 0 {
syscall.Close(eventFd)
} else if eventFd == e.sockFd {
sockFd, _, err := syscall.Accept(eventFd)
if err != nil {
continue
}
sockEvent := syscall.Kevent_t{
Ident: uint64(sockFd),
Filter: syscall.EVFILT_READ,
Flags: syscall.EV_ADD | syscall.EV_ENABLE,
}
r, err := syscall.Kevent(e.kqueueFd, []syscall.Kevent_t{sockEvent}, nil, nil)
if err != nil || r == -1 {
continue
}
} else if event.Filter&syscall.EVFILT_READ != 0 {
buf := make([]byte, 1024)
n, err := syscall.Read(eventFd, buf)
if err != nil {
continue
}
syscall.Write(eventFd, buf[:n])
}
}
}
}
This is the main event loop, let’s go through the code step by step.
- We create a new loop and use
syscall.Kevent
to poll for new events. Theevents
slice will be populated by new events and the number of new events will be returned bysyscall.Kevent
. - We then loop over all the events in the
events
slice and process them one by one. - If the event is for a file descriptor that has been closed by ANDing
event.Flags
andsyscall.EV_EOF
and we close the file descriptor. - If the
eventFd
is equal to thesockFd
this means that there is an event on our server socket and since we marked our server socket as passive usingsyscall.Listen
this means that there is a new connection. We then accept the new connection usingsyscall.Accept
and register this file descriptor with the kqueue. - Lastly if there is a file descriptor that is ready to be read we read the data into a buffer and write it back, hence creating an echo server.
This is a basic event loop that only listens for connections on a single socket. Production-ready event loops can listen to many types of events(file I/O, timers, etc.) but the basic concept remains the same under the hood.