Skip to content
Snippets Groups Projects
irkerd 21.8 KiB
Newer Older
Eric S. Raymond's avatar
Eric S. Raymond committed
#!/usr/bin/env python
"""
irkerd - a simple IRC multiplexer daemon
Eric S. Raymond's avatar
Eric S. Raymond committed

Listens for JSON objects of the form {'to':<irc-url>, 'privmsg':<text>}
and relays messages to IRC channels. Each request must be followed by
a newline.
Eric S. Raymond's avatar
Eric S. Raymond committed
The <text> must be a string.  The value of the 'to' attribute can be a
string containing an IRC URL (e.g. 'irc://chat.freenet.net/botwar') or
a list of such strings; in the latter case the message is broadcast to
Eric S. Raymond's avatar
Eric S. Raymond committed
all listed channels.  Note that the channel portion of the URL need
Eric S. Raymond's avatar
Eric S. Raymond committed
*not* have a leading '#' unless the channel name itself does.
Eric S. Raymond's avatar
Eric S. Raymond committed

Eric S. Raymond's avatar
Eric S. Raymond committed
Options: -d sets the debug-message level (probably only of interest to
Eric S. Raymond's avatar
Eric S. Raymond committed
developers). The -V option prints the program version and exits.
Eric S. Raymond's avatar
Eric S. Raymond committed
Design and code by Eric S. Raymond <esr@thyrsus.com>. See the project
resource page at <http://www.catb.org/~esr/irker/>.

Eric S. Raymond's avatar
Eric S. Raymond committed
Requires Python 2.6 and the irc client library at version >= 2.0.2: see
Eric S. Raymond's avatar
Eric S. Raymond committed
http://pypi.python.org/pypi/irc/
Eric S. Raymond's avatar
Eric S. Raymond committed
"""
# These things might need tuning

HOST = "localhost"
Eric S. Raymond's avatar
Eric S. Raymond committed
PORT = 6659
NAMESTYLE = "irker%03d"		# IRC nick template - must contain '%d'
Eric S. Raymond's avatar
Eric S. Raymond committed
XMIT_TTL = (3 * 60 * 60)	# Time to live, seconds from last transmit
PING_TTL = (15 * 60)		# Time to live, seconds from last PING
DISCONNECT_TTL = (24 * 60 * 60)	# Time to live, seconds from last connect
Eric S. Raymond's avatar
Eric S. Raymond committed
UNSEEN_TTL = 60			# Time to live, seconds since first request
Eric S. Raymond's avatar
Eric S. Raymond committed
CHANNEL_MAX = 18		# Max channels open per socket (default)
ANTI_FLOOD_DELAY = 0.5		# Anti-flood delay after transmissions, seconds
ANTI_BUZZ_DELAY = 0.09		# Anti-buzz delay after queue-empty check

# No user-serviceable parts below this line

# This black magic imports support for green threads (coroutines),
# then has kinky sex with the import library internals, replacing
Eric S. Raymond's avatar
Eric S. Raymond committed
# "threading" with a coroutine-using imposter.  Threads then become
# ultra-light-weight and cooperatively scheduled.
Eric S. Raymond's avatar
Eric S. Raymond committed
    import eventlet
    eventlet.monkey_patch()
    green_threads = True
    # With greenlets we don't worry about thread exhaustion, only the
    # file descriptor limit (typically 1024 on modern Unixes). Thus we
Eric S. Raymond's avatar
Eric S. Raymond committed
    # can handle a lot more concurrent sessions and generate less
    # join/leave spam under heavy load.
    CONNECTION_MAX = 1000
except ImportError:
    # Threads are more expensive if we have to use OS-level ones
    # rather than greenlets.  We need to avoid pushing thread limits
    # as well as fd limits.  See security.txt for discussion.
    CONNECTION_MAX = 200
    green_threads = False

Eric S. Raymond's avatar
Eric S. Raymond committed
import sys, json, getopt, urlparse, time, random
Eric S. Raymond's avatar
Eric S. Raymond committed
import threading, Queue, SocketServer
import irc.client, logging
Eric S. Raymond's avatar
Eric S. Raymond committed

version = "1.4"
Eric S. Raymond's avatar
Eric S. Raymond committed
# Sketch of implementation:
#
# One Irker object manages multiple IRC sessions.  It holds a map of
# Dispatcher objects, one per (server, port) combination, which are
# responsible for routing messages to one of any number of Connection
Eric S. Raymond's avatar
Eric S. Raymond committed
# objects that do the actual socket conversations.  The reason for the
# Dispatcher layer is that IRC daemons limit the number of channels a
# client (that is, from the daemon's point of view, a socket) can be
# joined to, so each session to a server needs a flock of Connection
# instances each with its own socket.
# Connections are timed out and removed when either they haven't seen a
Eric S. Raymond's avatar
Eric S. Raymond committed
# PING for a while (indicating that the server may be stalled or down)
Eric S. Raymond's avatar
Eric S. Raymond committed
# or there has been no message traffic to them for a while, or
# even if the queue is nonempty but efforts to connect have failed for
# a long time.
Eric S. Raymond's avatar
Eric S. Raymond committed
#
# There are multiple threads. One accepts incoming traffic from all servers.
# Each Connection also has a consumer thread and a thread-safe message queue.
Eric S. Raymond's avatar
Eric S. Raymond committed
# The program main appends messages to queues as JSON requests are received;
# the consumer threads try to ship them to servers.  When a socket write
# stalls, it only blocks an individual consumer thread; if it stalls long
# enough, the session will be timed out.
#
Eric S. Raymond's avatar
Eric S. Raymond committed
# Message delivery is thus not reliable in the face of network stalls,
# but this was considered acceptable because IRC (notoriously) has the
# same problem - there is little point in reliable delivery to a relay
# that is down or unreliable.
Eric S. Raymond's avatar
Eric S. Raymond committed
# This code uses only NICK, JOIN, MODE, and PRIVMSG. It is strictly
# compliant to RFC1459, except for the interpretation and use of the
Eric S. Raymond's avatar
Eric S. Raymond committed
# DEAF and CHANLIMIT and (obsolete) MAXCHANNELS features.  CHANLIMIT
# is as described in the Internet RFC draft
# draft-brocklesby-irc-isupport-03 at <http://www.mirc.com/isupport.html>.
class Connection:
    def __init__(self, irkerd, servername, port):
Eric S. Raymond's avatar
Eric S. Raymond committed
        self.irker = irkerd
        self.servername = servername
        self.port = port
Eric S. Raymond's avatar
Eric S. Raymond committed
        self.status = "unseen"
        self.last_xmit = time.time()
Eric S. Raymond's avatar
Eric S. Raymond committed
        self.last_ping = time.time()
        self.channel_limits = {}
        # The consumer thread
        self.queue = Queue.Queue()
        self.thread = threading.Thread(target=self.dequeue)
        self.thread.daemon = True
    def nickname(self, n=None):
        "Return a name for the nth server connection."
        if n is None:
            n = self.nick_trial
    def handle_ping(self):
        "Register the fact that the server has pinged this connection."
        self.last_ping = time.time()
    def handle_welcome(self):
        "The server says we're OK, with a non-conflicting nick."
        self.status = "ready"
        self.irker.debug(1, "nick %s accepted" % self.nickname())
    def handle_badnick(self):
        "The server says our nick has a conflict."
        self.irker.debug(1, "nick %s rejected" % self.nickname())
Eric S. Raymond's avatar
Eric S. Raymond committed
        # Randomness prevents a malicious user or bot from antcipating the
        # next trial name in order to block us from completing the handshake.
        self.nick_trial += random.randint(1, 3)
Eric S. Raymond's avatar
Eric S. Raymond committed
        self.connection.nick(self.nickname())
    def handle_disconnect(self):
        "Server disconnected us for flooding or some other reason."
        self.connection = None
Eric S. Raymond's avatar
Eric S. Raymond committed
    def handle_kick(self, outof):
Eric S. Raymond's avatar
Eric S. Raymond committed
        "We've been kicked."
Eric S. Raymond's avatar
Eric S. Raymond committed
        self.status = "handshaking"
        try:
            self.channels_joined.remove(outof)
        except ValueError:
            self.irker.logerr("kicked by %s from %s that's not joined" \
                              % (self.servername, outof))
Eric S. Raymond's avatar
Eric S. Raymond committed
        qcopy = []
        while not self.queue.empty():
            (channel, message) = self.queue.get()
            if channel != outof:
                qcopy.append((channel, message))
        for (channel, message) in qcopy:
            self.queue.put((channel, message))
        self.status = "ready"
    def enqueue(self, channel, message):
        "Enque a message for transmission."
        self.queue.put((channel, message))
    def dequeue(self):
        "Try to ship pending messages from the queue."
        while True:
            # We want to be kind to the IRC servers and not hold unused
            # sockets open forever, so they have a time-to-live.  The
            # loop is coded this particular way so that we can drop
            # the actual server connection when its time-to-live
            # expires, then reconnect and resume transmission if the
            # queue fills up again.
            if not self.connection:
                self.connection = self.irker.irc.server()
                self.connection.context = self
                # Try to avoid colliding with other instances
                self.nick_trial = random.randint(1, 990)
                # This will throw irc.client.ServerConnectionError on failure
                try:
                    self.connection.connect(self.servername,
                                        nickname=self.nickname(),
                                        username="irker",
                                        ircname="irker relaying client")
                    self.status = "handshaking"
                    self.irker.debug(1, "XMIT_TTL bump (%s connection) at %s" % (self.servername, time.asctime()))
                    self.last_xmit = time.time()
                except irc.client.ServerConnectionError:
                    self.status = "disconnected"
            elif self.status == "handshaking":
                # Don't buzz on the empty-queue test while we're handshaking 
                time.sleep(ANTI_BUZZ_DELAY)
            elif self.queue.empty():
Eric S. Raymond's avatar
Eric S. Raymond committed
                # Queue is empty, at some point we want to time out
                # the connection rather than holding a socket open in
                # the server forever.
                now = time.time()
                if now > self.last_xmit + XMIT_TTL \
                       or now > self.last_ping + PING_TTL:
                    self.irker.debug(1, "timing out inactive connection to %s at %s" % (self.servername, time.asctime()))
                    self.connection.context = None
                    self.connection.close()
                    self.connection = None
                    self.status = "disconnected"
                else:
                    # Prevent this thread from hogging the CPU by pausing
                    # for just a little bit after the queue-empty check.
                    # As long as this is less that the duration of a human
                    # reflex arc it is highly unlikely any human will ever
                    # notice.
                    time.sleep(ANTI_BUZZ_DELAY)
            elif self.status == "disconnected" \
                     and time.time() > self.last_xmit + DISCONNECT_TTL:
Eric S. Raymond's avatar
Eric S. Raymond committed
                # Queue is nonempty, but the IRC server might be down. Letting
                # failed connections retain queue space forever would be a
                # memory leak.  
                self.status = "expired"
                break
            elif self.status == "unseen" \
                     and time.time() > self.last_xmit + UNSEEN_TTL:
                # Nasty people could attempt a denial-of-service
                # attack by flooding us with requests with invalid
                # servernames. We guard against this by rapidly
                # expiring connections that have a nonempty queue but
                # have never had a successful open.
                self.status = "expired"
                break
            elif self.status == "ready":
                (channel, message) = self.queue.get()
                if channel not in self.channels_joined:
                    self.channels_joined.append(channel)
Eric S. Raymond's avatar
Eric S. Raymond committed
                    self.connection.join(channel)
                    self.irker.debug(1, "joining %s on %s." % (channel, self.servername))
                for segment in message.split("\n"):
                    self.connection.privmsg(channel, segment)
                    time.sleep(ANTI_FLOOD_DELAY)
Eric S. Raymond's avatar
Eric S. Raymond committed
                self.last_xmit = time.time()
                self.irker.debug(1, "XMIT_TTL bump (%s transmission) at %s" % (self.servername, time.asctime()))
    def live(self):
        "Should this connection not be scavenged?"
        return self.status != "expired"
    def joined_to(self, channel):
        "Is this connection joined to the specified channel?"
        return channel in self.channels_joined
    def accepting(self, channel):
        "Can this connection accept a join of this channel?"
        if self.channel_limits:
            match_count = 0
            for already in self.channels_joined:
                if already[0] == channel[0]:
                    match_count += 1
Eric S. Raymond's avatar
Eric S. Raymond committed
            return match_count < self.channel_limits.get(channel[0], CHANNEL_MAX)
        else:
            return len(self.channels_joined) < CHANNEL_MAX

class Target():
    "Represent a transmission target."
    def __init__(self, url):
        parsed = urlparse.urlparse(url)
        irchost, _, ircport = parsed.netloc.partition(':')
        if not ircport:
            ircport = 6667
        self.servername = irchost
        # IRC channel names are case-insensitive.  If we don't smash
        # case here we may run into problems later. There was a bug
        # observed on irc.rizon.net where an irkerd user specified #Channel,
        # got kicked, and irkerd crashed because the server returned
        # "#channel" in the notification that our kick handler saw.
        self.channel = parsed.path.lstrip('/').lower()
Eric S. Raymond's avatar
Eric S. Raymond committed
        if self.channel and self.channel[0] not in "#&+":
            self.channel = "#" + self.channel
    def valid(self):
        "Both components must be present for a valid target."
Eric S. Raymond's avatar
Eric S. Raymond committed
        return self.servername and self.channel
    def server(self):
        "Return a hashable tuple representing the destination server."
        return (self.servername, self.port)
class Dispatcher:
    "Manage connections to a particular server-port combination."
Eric S. Raymond's avatar
Eric S. Raymond committed
    def __init__(self, irkerd, servername, port):
        self.irker = irkerd
        self.servername = servername
        self.port = port
        self.connections = []
    def dispatch(self, channel, message):
        "Dispatch messages for our server-port combination."
        connections = [x for x in self.connections if x.live()]
        eligibles = [x for x in connections if x.joined_to(channel)] \
                    or [x for x in connections if x.accepting(channel)]
Eric S. Raymond's avatar
Eric S. Raymond committed
        if not eligibles:
            newconn = Connection(self.irker,
                                 self.servername,
Eric S. Raymond's avatar
Eric S. Raymond committed
            self.connections.append(newconn)
            eligibles = [newconn]
        eligibles[0].enqueue(channel, message)
    def live(self):
        "Does this server-port combination have any live connections?"
        self.connections = [x for x in self.connections if x.live()]
        return len(self.connections) > 0
Eric S. Raymond's avatar
Eric S. Raymond committed
class Irker:
    "Persistent IRC multiplexer."
    def __init__(self, debuglevel=0):
        self.debuglevel = debuglevel
        self.irc = irc.client.IRC()
Eric S. Raymond's avatar
Eric S. Raymond committed
        self.irc.add_global_handler("ping", self._handle_ping)
        self.irc.add_global_handler("welcome", self._handle_welcome)
        self.irc.add_global_handler("erroneusnickname", self._handle_badnick)
        self.irc.add_global_handler("nicknameinuse", self._handle_badnick)
        self.irc.add_global_handler("nickcollision", self._handle_badnick)
        self.irc.add_global_handler("unavailresource", self._handle_badnick)
        self.irc.add_global_handler("featurelist", self._handle_features)
        self.irc.add_global_handler("disconnect", self._handle_disconnect)
Eric S. Raymond's avatar
Eric S. Raymond committed
        self.irc.add_global_handler("kick", self._handle_kick)
Eric S. Raymond's avatar
Eric S. Raymond committed
        thread = threading.Thread(target=self.irc.process_forever)
        thread.daemon = True
Eric S. Raymond's avatar
Eric S. Raymond committed
        self.irc._thread = thread
        thread.start()
        self.servers = {}
Eric S. Raymond's avatar
Eric S. Raymond committed
    def logerr(self, errmsg):
        "Log a processing error."
        sys.stderr.write("irkerd: " + errmsg + "\n")
Eric S. Raymond's avatar
Eric S. Raymond committed
    def debug(self, level, errmsg):
        "Debugging information."
        if self.debuglevel >= level:
            sys.stderr.write("irkerd: %s\n" % errmsg)
Eric S. Raymond's avatar
Eric S. Raymond committed
    def _handle_ping(self, connection, _event):
        "PING arrived, bump the last-received time for the connection."
        if connection.context:
            connection.context.handle_ping()
Eric S. Raymond's avatar
Eric S. Raymond committed
    def _handle_welcome(self, connection, _event):
        "Welcome arrived, nick accepted for this connection."
        if connection.context:
            connection.context.handle_welcome()
Eric S. Raymond's avatar
Eric S. Raymond committed
    def _handle_badnick(self, connection, _event):
        "Nick not accepted for this connection."
        if connection.context:
            connection.context.handle_badnick()
    def _handle_features(self, connection, event):
        "Determine if and how we can set deaf mode."
        if connection.context:
            cxt = connection.context
            for lump in event.arguments():
                if lump.startswith("DEAF="):
                    connection.mode(cxt.nickname(), "+"+lump[5:])
                elif lump.startswith("MAXCHANNELS="):
                    m = int(lump[12:])
                    for pref in "#&+":
                        cxt.channel_limits[pref] = m
                    self.debug(1, "%s maxchannels is %d" \
                               % (connection.server, m))
                elif lump.startswith("CHANLIMIT=#:"):
                    limits = lump[10:].split(",")
                    try:
                        for token in limits:
                            (prefixes, limit) = token.split(":")
                            limit = int(limit)
                            for c in prefixes:
                                cxt.channel_limits[c] = limit
                        self.debug(1, "%s channel limit map is %s" \
                                   % (connection.server, cxt.channel_limits))
                    except ValueError:
                        self.logerr("ill-formed CHANLIMIT property")
    def _handle_disconnect(self, connection, _event):
        "Server hung up the connection."
        self.debug(1, "server %s disconnected" % connection.server)
        if connection.context:
            connection.context.handle_disconnect()
Eric S. Raymond's avatar
Eric S. Raymond committed
    def _handle_kick(self, connection, event):
        "Server hung up the connection."
        self.debug(1, "irker has been kicked from %s on %s" % (event.target(), connection.server))
Eric S. Raymond's avatar
Eric S. Raymond committed
        if connection.context:
            connection.context.handle_kick(event.target())
Eric S. Raymond's avatar
Eric S. Raymond committed
    def handle(self, line):
        "Perform a JSON relay request."
        try:
            request = json.loads(line.strip())
            if type(request) != type({}):
                self.logerr("request in tot a JSON dictionary: %s" % repr(request))
            elif "to" not in request or "privmsg" not in request:
                self.logerr("malformed reqest - 'to' or 'privmsg' missing: %s" % repr(request))
Eric S. Raymond's avatar
Eric S. Raymond committed
            else:
                channels = request['to']
                message = request['privmsg']
                if type(channels) not in (type([]), type(u"")) \
                       or type(message) != type(u""):
                    self.logerr("malformed request - unexpected types: %s" % repr(request))
                else:
                    if type(channels) == type(u""):
                        channels = [channels]
                    for url in channels:
                        if type(url) != type(u""):
                            self.logerr("malformed request - unexpected type: %s" % repr(request))
                        else:
                            if not target.valid():
                                return
                            if target.server() not in self.servers:
                                self.servers[target.server()] = Dispatcher(self, target.servername, target.port)
                            self.servers[target.server()].dispatch(target.channel, message)
                            # GC dispatchers with no active connections
                            servernames = self.servers.keys()
                            for servername in servernames:
                                if not self.servers[servername].live():
                                    del self.servers[servername]
Eric S. Raymond's avatar
Eric S. Raymond committed
                            # If we might be pushing a resource limit
                            # even after garbage collection, remove a
                            # session.  The goal here is to head off
                            # DoS attacks that aim at exhausting
                            # thread space or file descriptors.  The
                            # cost is that attempts to DoS this
                            # service will cause lots of join/leave
                            # spam as we scavenge old channels after
                            # connecting to new ones. The particular
                            # method used for selecting a session to
                            # be terminated doesn't matter much; we
                            # choose the one longest idle on the
                            # assumption that message activity is likely
                            # to be clumpy.
                            oldest = None
                            if len(self.servers) >= CONNECTION_MAX:
                                for (name, server) in self.servers.items():
                                    if not oldest or server.last_xmit < self.servers[oldest].last_xmit:
                                        oldest = name
                                del self.servers[oldest]
Eric S. Raymond's avatar
Eric S. Raymond committed
        except ValueError:
            self.logerr("can't recognize JSON on input: %s" % repr(line))
Eric S. Raymond's avatar
Eric S. Raymond committed
        except RuntimeError:
            self.logerr("wildly malformed JSON blew the parser stack.")
class IrkerTCPHandler(SocketServer.StreamRequestHandler):
Eric S. Raymond's avatar
Eric S. Raymond committed
    def handle(self):
Eric S. Raymond's avatar
Eric S. Raymond committed
        while True:
Eric S. Raymond's avatar
Eric S. Raymond committed
            line = self.rfile.readline()
            if not line:
                break
            irker.handle(line.strip())
class IrkerUDPHandler(SocketServer.BaseRequestHandler):
    def handle(self):
        data = self.request[0].strip()
        #socket = self.request[1]
        irker.handle(data)

Eric S. Raymond's avatar
Eric S. Raymond committed
if __name__ == '__main__':
Eric S. Raymond's avatar
Eric S. Raymond committed
    debuglvl = 0
Eric S. Raymond's avatar
Eric S. Raymond committed
    (options, arguments) = getopt.getopt(sys.argv[1:], "d:V")
Eric S. Raymond's avatar
Eric S. Raymond committed
    for (opt, val) in options:
Eric S. Raymond's avatar
Eric S. Raymond committed
        if opt == '-d':		# Enable debug/progress messages
Eric S. Raymond's avatar
Eric S. Raymond committed
            debuglvl = int(val)
            if debuglvl > 1:
Eric S. Raymond's avatar
Eric S. Raymond committed
                logging.basicConfig(level=logging.DEBUG)
        elif opt == '-V':	# Emit version and exit
            sys.stdout.write("irkerd version %s\n" % version)
            sys.exit(0)
Eric S. Raymond's avatar
Eric S. Raymond committed
    irker = Irker(debuglevel=debuglvl)
Eric S. Raymond's avatar
Eric S. Raymond committed
    tcpserver = SocketServer.TCPServer((HOST, PORT), IrkerTCPHandler)
    tcpserver = threading.Thread(target=tcpserver.serve_forever)
    tcpserver.daemon = True
    tcpserver.start()
Eric S. Raymond's avatar
Eric S. Raymond committed
    udpserver = SocketServer.UDPServer((HOST, PORT), IrkerUDPHandler)
    udpserver = threading.Thread(target=udpserver.serve_forever)
    udpserver.daemon = True
    udpserver.start()
    try:
        while True:
            time.sleep(10)
    except KeyboardInterrupt:
        raise SystemExit, 1