Newer
Older
Listens for JSON objects of the form {'to':<irc-url>, 'privmsg':<text>}
and relays messages to IRC channels. Each request must be followed by
a newline.
The <text> must be a string. The value of the 'to' attribute can be a
string containing an IRC URL (e.g. 'irc://chat.freenet.net/botwar') or
a list of such strings; in the latter case the message is broadcast to
all listed channels. Note that the channel portion of the URL need
*not* have a leading '#' unless the channel name itself does.
Design and code by Eric S. Raymond <esr@thyrsus.com>. See the project
resource page at <http://www.catb.org/~esr/irker/>.
Requires Python 2.7, or:
* 2.6 with the argparse package installed.
# These things might need tuning
HOST = "localhost"
XMIT_TTL = (3 * 60 * 60) # Time to live, seconds from last transmit
PING_TTL = (15 * 60) # Time to live, seconds from last PING
HANDSHAKE_TTL = 60 # Time to live, seconds from nick transmit
CHANNEL_TTL = (3 * 60 * 60) # Time to live, seconds from last transmit
DISCONNECT_TTL = (24 * 60 * 60) # Time to live, seconds from last connect
UNSEEN_TTL = 60 # Time to live, seconds since first request
CHANNEL_MAX = 18 # Max channels open per socket (default)
ANTI_FLOOD_DELAY = 1.0 # Anti-flood delay after transmissions, seconds
ANTI_BUZZ_DELAY = 0.09 # Anti-buzz delay after queue-empty check
CONNECTION_MAX = 200 # To avoid hitting a thread limit
# No user-serviceable parts below this line
try: # Python 3
import queue
except ImportError: # Python 2
import Queue as queue
import random
import re
import select
import signal
import socket
try: # Python 3
import socketserver
except ImportError: # Python 2
import SocketServer as socketserver
import sys
import threading
import time
try: # Python 3
import urllib.parse as urllib_parse
except ImportError: # Python 2
import urlparse as urllib_parse
LOG = logging.getLogger(__name__)
LOG.setLevel(logging.ERROR)
LOG_LEVELS = ['critical', 'error', 'warning', 'info', 'debug']
# One Irker object manages multiple IRC sessions. It holds a map of
# Dispatcher objects, one per (server, port) combination, which are
# responsible for routing messages to one of any number of Connection
# objects that do the actual socket conversations. The reason for the
# Dispatcher layer is that IRC daemons limit the number of channels a
# client (that is, from the daemon's point of view, a socket) can be
# joined to, so each session to a server needs a flock of Connection
# instances each with its own socket.
# Connections are timed out and removed when either they haven't seen a
# PING for a while (indicating that the server may be stalled or down)
# or there has been no message traffic to them for a while, or
# even if the queue is nonempty but efforts to connect have failed for
# a long time.
# There are multiple threads. One accepts incoming traffic from all
# servers. Each Connection also has a consumer thread and a
# thread-safe message queue. The program main appends messages to
# queues as JSON requests are received; the consumer threads try to
# ship them to servers. When a socket write stalls, it only blocks an
# individual consumer thread; if it stalls long enough, the session
# will be timed out. This solves the biggest problem with a
# single-threaded implementation, which is that you can't count on a
# single stalled write not hanging all other traffic - you're at the
# mercy of the length of the buffers in the TCP/IP layer.
# Message delivery is thus not reliable in the face of network stalls,
# but this was considered acceptable because IRC (notoriously) has the
# same problem - there is little point in reliable delivery to a relay
# that is down or unreliable.
# This code uses only NICK, JOIN, PART, MODE, PRIVMSG, USER, and QUIT.
# It is strictly compliant to RFC1459, except for the interpretation and
# use of the DEAF and CHANLIMIT and (obsolete) MAXCHANNELS features.
#
# CHANLIMIT is as described in the Internet RFC draft
# draft-brocklesby-irc-isupport-03 at <http://www.mirc.com/isupport.html>.
# The ",isnick" feature is as described in
# <http://ftp.ics.uci.edu/pub/ietf/uri/draft-mirashi-url-irc-01.txt>.
# Historical note: the IRCClient and IRCServerConnection classes
# (~270LOC) replace the overweight, overcomplicated 3KLOC mass of
# irclib code that irker formerly used as a service library. They
# still look similar to parts of irclib because I contributed to that
# code before giving up on it.
class IRCError(Exception):
"An IRC exception"
pass
class InvalidRequest (ValueError):
"An invalid JSON request"
pass
class IRCClient():
"An IRC client session to one or more servers."
self.mutex = threading.RLock()
self.server_connections = []
self.event_handlers = {}
self.add_event_handler("ping",
lambda c, e: c.ship("PONG %s" % e.target))
def newserver(self):
"Initialize a new server-connection object."
conn = IRCServerConnection(self)
with self.mutex:
self.server_connections.append(conn)
return conn
"Spin processing data from connections forever."
# Outer loop should specifically *not* be mutex-locked.
# Otherwise no other thread would ever be able to change
# the shared state of an IRC object running this function.
connected = [x for x in self.server_connections
if x is not None and x.socket is not None]
sockets = [x.socket for x in connected]
connmap = dict([(c.socket.fileno(), c) for c in connected])
(insocks, _o, _e) = select.select(sockets, [], [], timeout)
for s in insocks:
connmap[s.fileno()].consume()
def add_event_handler(self, event, handler):
"Set a handler to be called later."
with self.mutex:
event_handlers = self.event_handlers.setdefault(event, [])
event_handlers.append(handler)
def handle_event(self, connection, event):
with self.mutex:
h = self.event_handlers
th = sorted(h.get("all_events", []) + h.get(event.type, []))
for handler in th:
handler(connection, event)
def drop_connection(self, connection):
with self.mutex:
self.server_connections.remove(connection)
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
class LineBufferedStream():
"Line-buffer a read stream."
crlf_re = re.compile(b'\r?\n')
def __init__(self):
self.buffer = ''
def append(self, newbytes):
self.buffer += newbytes
def lines(self):
"Iterate over lines in the buffer."
lines = LineBufferedStream.crlf_re.split(self.buffer)
self.buffer = lines.pop()
return iter(lines)
def __iter__(self):
return self.lines()
class IRCServerConnectionError(IRCError):
pass
class IRCServerConnection():
command_re = re.compile("^(:(?P<prefix>[^ ]+) +)?(?P<command>[^ ]+)( *(?P<argument> .+))?")
# The full list of numeric-to-event mappings is in Perl's Net::IRC.
# We only need to ensure that if some ancient server throws numerics
# for the ones we actually want to catch, they're mapped.
codemap = {
"001": "welcome",
"005": "featurelist",
"432": "erroneusnickname",
"433": "nicknameinuse",
"436": "nickcollision",
"437": "unavailresource",
}
def __init__(self, master):
self.master = master
self.socket = None
def connect(self, target, nickname,
password=None, username=None, ircname=None):
LOG.debug("connect(server=%r, port=%r, nickname=%r, ...)" % (
target.servername, target.port, nickname))
if self.socket is not None:
self.disconnect("Changing servers")
self.buffer = LineBufferedStream()
self.event_handlers = {}
self.real_server_name = ""
self.target = target
self.nickname = nickname
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.bind(('', 0))
self.socket.connect((target.servername, target.port))
except socket.error as err:
raise IRCServerConnectionError("Couldn't connect to socket: %s" % err)
if password:
self.ship("PASS " + password)
self.user(username=username or ircname, realname=ircname or nickname)
return self
def close(self):
# Without this thread lock, there is a window during which
# select() can find a closed socket, leading to an EBADF error.
with self.master.mutex:
self.disconnect("Closing object")
self.master.drop_connection(self)
def consume(self):
try:
incoming = self.socket.recv(16384)
except socket.error:
# Server hung up on us.
self.disconnect("Connection reset by peer")
return
if not incoming:
# Dead air also indicates a connection reset.
self.disconnect("Connection reset by peer")
return
self.buffer.append(incoming)
for line in self.buffer:
LOG.debug("FROM: %s" % line)
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
if not line:
continue
prefix = None
command = None
arguments = None
self.handle_event(Event("every_raw_message",
self.real_server_name,
None,
[line]))
m = IRCServerConnection.command_re.match(line)
if m.group("prefix"):
prefix = m.group("prefix")
if not self.real_server_name:
self.real_server_name = prefix
if m.group("command"):
command = m.group("command").lower()
if m.group("argument"):
a = m.group("argument").split(" :", 1)
arguments = a[0].split()
if len(a) == 2:
arguments.append(a[1])
command = IRCServerConnection.codemap.get(command, command)
if command in ["privmsg", "notice"]:
target = arguments.pop(0)
else:
target = None
if command == "quit":
arguments = [arguments[0]]
elif command == "ping":
target = arguments[0]
else:
target = arguments[0]
arguments = arguments[1:]
LOG.debug("command: %s, source: %s, target: %s, arguments: %s" % (
command, prefix, target, arguments))
self.handle_event(Event(command, prefix, target, arguments))
def handle_event(self, event):
self.master.handle_event(self, event)
if event.type in self.event_handlers:
for fn in self.event_handlers[event.type]:
fn(self, event)
def is_connected(self):
return self.socket is not None
def disconnect(self, message=""):
if self.socket is None:
return
# Don't send a QUIT here - causes infinite loop!
try:
self.socket.shutdown(socket.SHUT_WR)
self.socket.close()
except socket.error:
pass
del self.socket
self.socket = None
self.handle_event(
Event("disconnect", self.target.server, "", [message]))
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
def join(self, channel, key=""):
self.ship("JOIN %s%s" % (channel, (key and (" " + key))))
def mode(self, target, command):
self.ship("MODE %s %s" % (target, command))
def nick(self, newnick):
self.ship("NICK " + newnick)
def part(self, channel, message=""):
cmd_parts = ['PART', channel]
if message:
cmd_parts.append(message)
self.ship(' '.join(cmd_parts))
def privmsg(self, target, text):
self.ship("PRIVMSG %s :%s" % (target, text))
def quit(self, message=""):
self.ship("QUIT" + (message and (" :" + message)))
def user(self, username, realname):
self.ship("USER %s 0 * :%s" % (username, realname))
def ship(self, string):
"Ship a command to the server, appending CR/LF"
try:
self.socket.send(string.encode('utf-8') + b'\r\n')
LOG.debug("TO: %s" % string)
except socket.error:
self.disconnect("Connection reset by peer.")
class Event(object):
def __init__(self, evtype, source, target, arguments=None):
self.type = evtype
self.source = source
self.target = target
if arguments is None:
arguments = []
self.arguments = arguments
def is_channel(string):
return string and string[0] in "#&+!"
def __init__(self, irker, target, nick_template, nick_needs_number=False,
password=None, **kwargs):
self.irker = irker
self.target = target
self.nick_template = nick_template
self.nick_needs_number = nick_needs_number
self.password = password
self.kwargs = kwargs
self.nick_trial = None
self.connection = None
# The consumer thread
self.queue = queue.Queue()
"Return a name for the nth server connection."
if n is None:
n = self.nick_trial
if self.nick_needs_number:
return (self.nick_template % n)
return self.nick_template
def handle_ping(self):
"Register the fact that the server has pinged this connection."
self.last_ping = time.time()
def handle_welcome(self):
"The server says we're OK, with a non-conflicting nick."
LOG.info("nick %s accepted" % self.nickname())
if self.password:
self.connection.privmsg("nickserv", "identify %s" % self.password)
def handle_badnick(self):
"The server says our nick is ill-formed or has a conflict."
LOG.info("nick %s rejected" % self.nickname())
if self.nick_needs_number:
# Randomness prevents a malicious user or bot from
# anticipating the next trial name in order to block us
# from completing the handshake.
self.nick_trial += random.randint(1, 3)
self.last_xmit = time.time()
self.connection.nick(self.nickname())
# Otherwise fall through, it might be possible to
# recover manually.
def handle_disconnect(self):
"Server disconnected us for flooding or some other reason."
self.connection = None
if self.status != "expired":
self.status = "disconnected"
del self.channels_joined[outof]
except KeyError:
LOG.error("kicked by %s from %s that's not joined" % (
self.target, outof))
Ben Kelly
committed
(channel, message, key) = self.queue.get()
Ben Kelly
committed
qcopy.append((channel, message, key))
for (channel, message, key) in qcopy:
self.queue.put((channel, message, key))
def enqueue(self, channel, message, key, quit_after=False):
"Enque a message for transmission."
if self.thread is None or not self.thread.is_alive():
self.thread = threading.Thread(target=self.dequeue)
self.thread.setDaemon(True)
self.thread.start()
Ben Kelly
committed
self.queue.put((channel, message, key))
if quit_after:
self.queue.put((channel, None, key))
def dequeue(self):
"Try to ship pending messages from the queue."
try:
while True:
# We want to be kind to the IRC servers and not hold unused
# sockets open forever, so they have a time-to-live. The
# loop is coded this particular way so that we can drop
# the actual server connection when its time-to-live
# expires, then reconnect and resume transmission if the
# queue fills up again.
# Queue is empty, at some point we want to time out
# the connection rather than holding a socket open in
# the server forever.
now = time.time()
xmit_timeout = now > self.last_xmit + XMIT_TTL
ping_timeout = now > self.last_ping + PING_TTL
Alexander van Gessel (AI0867)
committed
if self.status == "disconnected":
# If the queue is empty, we can drop this connection.
self.status = "expired"
break
elif xmit_timeout or ping_timeout:
LOG.info((
"timing out connection to %s at %s "
"(ping_timeout=%s, xmit_timeout=%s)") % (
self.target, time.asctime(), ping_timeout,
Eric S. Raymond
committed
with self.irker.irc.mutex:
self.connection.context = None
self.connection.quit("transmission timeout")
self.connection = None
self.status = "disconnected"
else:
# Prevent this thread from hogging the CPU by pausing
# for just a little bit after the queue-empty check.
# As long as this is less that the duration of a human
# reflex arc it is highly unlikely any human will ever
# notice.
time.sleep(ANTI_BUZZ_DELAY)
Alexander van Gessel (AI0867)
committed
elif self.status == "disconnected" \
and time.time() > self.last_xmit + DISCONNECT_TTL:
# Queue is nonempty, but the IRC server might be
# down. Letting failed connections retain queue
# space forever would be a memory leak.
self.status = "expired"
break
elif not self.connection and self.status != "expired":
Eric S. Raymond
committed
with self.irker.irc.mutex:
self.connection = self.irker.irc.newserver()
Eric S. Raymond
committed
self.connection.context = self
# Try to avoid colliding with other instances
self.nick_trial = random.randint(1, 990)
self.channels_joined = {}
try:
# This will throw
# IRCServerConnectionError on failure
self.connection.connect(
target=self.target,
nickname=self.nickname(),
username="irker",
ircname="irker relaying client",
**self.kwargs)
Eric S. Raymond
committed
self.status = "handshaking"
LOG.info("XMIT_TTL bump (%s connection) at %s" % (
self.target, time.asctime()))
Eric S. Raymond
committed
self.last_xmit = time.time()
self.last_ping = time.time()
self.status = "expired"
if time.time() > self.last_xmit + HANDSHAKE_TTL:
self.status = "expired"
else:
# Don't buzz on the empty-queue test while we're
# handshaking
time.sleep(ANTI_BUZZ_DELAY)
elif self.status == "unseen" \
and time.time() > self.last_xmit + UNSEEN_TTL:
# Nasty people could attempt a denial-of-service
# attack by flooding us with requests with invalid
# servernames. We guard against this by rapidly
# expiring connections that have a nonempty queue but
# have never had a successful open.
self.status = "expired"
break
elif self.status == "ready":
Ben Kelly
committed
(channel, message, key) = self.queue.get()
if channel not in self.channels_joined:
Ben Kelly
committed
self.connection.join(channel, key=key)
LOG.info("joining %s on %s." % (channel, self.target))
# None is magic - it's a request to quit the server
if message is None:
self.connection.quit()
# An empty message might be used as a keepalive or
# to join a channel for logging, so suppress the
# privmsg send unless there is actual traffic.
for segment in message.split("\n"):
# Truncate the message if it's too long,
# but we're working with characters here,
# not bytes, so we could be off.
# 500 = 512 - CRLF - 'PRIVMSG ' - ' :'
maxlength = 500 - len(channel)
if len(segment) > maxlength:
segment = segment[:maxlength]
try:
self.connection.privmsg(channel, segment)
except ValueError as err:
LOG.warning((
"irclib rejected a message to %s on %s "
"because: %s") % (
channel, self.target, str(err)))
time.sleep(ANTI_FLOOD_DELAY)
self.last_xmit = self.channels_joined[channel] = time.time()
LOG.info("XMIT_TTL bump (%s transmission) at %s" % (
self.target, time.asctime()))
self.queue.task_done()
elif self.status == "expired":
LOG.error(
"We're expired but still running! This is a bug.")
except Exception as e:
LOG.error("exception %s in thread for %s" % (e, self.target))
# Maybe this should have its own status?
self.status = "expired"
finally:
try:
# Make sure we don't leave any zombies behind
self.connection.close()
except:
# Irclib has a habit of throwing fresh exceptions here. Ignore that
pass
def live(self):
"Should this connection not be scavenged?"
return self.status != "expired"
def joined_to(self, channel):
"Is this connection joined to the specified channel?"
return channel in self.channels_joined
def accepting(self, channel):
"Can this connection accept a join of this channel?"
if self.channel_limits:
match_count = 0
for already in self.channels_joined:
# This obscure code is because the RFCs allow separate limits
# by channel type (indicated by the first character of the name)
# a feature that is almost never actually used.
if already[0] == channel[0]:
match_count += 1
return match_count < self.channel_limits.get(channel[0], CHANNEL_MAX)
else:
return len(self.channels_joined) < CHANNEL_MAX
class Target():
"Represent a transmission target."
def __init__(self, url):
self.url = url
parsed = urllib_parse.urlparse(url)
irchost, _, ircport = parsed.netloc.partition(':')
if not ircport:
ircport = 6667
self.servername = irchost
# IRC channel names are case-insensitive. If we don't smash
# case here we may run into problems later. There was a bug
# observed on irc.rizon.net where an irkerd user specified #Channel,
# got kicked, and irkerd crashed because the server returned
# "#channel" in the notification that our kick handler saw.
self.channel = parsed.path.lstrip('/').lower()
# This deals with a tweak in recent versions of urlparse.
if parsed.fragment:
self.channel += "#" + parsed.fragment
isnick = self.channel.endswith(",isnick")
if isnick:
self.channel = self.channel[:-7]
if self.channel and not isnick and self.channel[0] not in "#&+":
self.channel = "#" + self.channel
Ben Kelly
committed
# support both channel?secret and channel?key=secret
Ben Kelly
committed
if parsed.query:
self.key = re.sub("^key=", "", parsed.query)
self.port = int(ircport)
def __str__(self):
"Represent this instance as a string"
return self.servername or self.url or repr(self)
def validate(self):
"Raise InvalidRequest if the URL is missing a critical component"
if not self.servername:
raise InvalidRequest(
'target URL missing a servername: %r' % self.url)
if not self.channel:
raise InvalidRequest(
'target URL missing a channel: %r' % self.url)
def server(self):
"Return a hashable tuple representing the destination server."
return (self.servername, self.port)
"Manage connections to a particular server-port combination."
def __init__(self, irker, **kwargs):
self.irker = irker
self.kwargs = kwargs
def dispatch(self, channel, message, key, quit_after=False):
"Dispatch messages for our server-port combination."
# First, check if there is room for another channel
# on any of our existing connections.
connections = [x for x in self.connections if x.live()]
eligibles = [x for x in connections if x.joined_to(channel)] \
or [x for x in connections if x.accepting(channel)]
eligibles[0].enqueue(channel, message, key, quit_after)
return
# All connections are full up. Look for one old enough to be
# scavenged.
ancients = []
for connection in connections:
for (chan, age) in connections.channels_joined.items():
if age < time.time() - CHANNEL_TTL:
ancients.append((connection, chan, age))
if ancients:
ancients.sort(key=lambda x: x[2])
(found_connection, drop_channel, _drop_age) = ancients[0]
found_connection.part(drop_channel, "scavenged by irkerd")
del found_connection.channels_joined[drop_channel]
#time.sleep(ANTI_FLOOD_DELAY)
found_connection.enqueue(channel, message, key, quit_after)
# All existing channels had recent activity
newconn = Connection(self.irker, **self.kwargs)
newconn.enqueue(channel, message, key, quit_after)
def live(self):
"Does this server-port combination have any live connections?"
self.connections = [x for x in self.connections if x.live()]
return len(self.connections) > 0
def pending(self):
"Return all connections with pending traffic."
return [x for x in self.connections if not x.queue.empty()]
def last_xmit(self):
"Return the time of the most recent transmission."
return max(x.last_xmit for x in self.connections)
def __init__(self, logfile=None, **kwargs):
self.logfile = logfile
self.kwargs = kwargs
self.irc.add_event_handler("ping", self._handle_ping)
self.irc.add_event_handler("welcome", self._handle_welcome)
self.irc.add_event_handler("erroneusnickname", self._handle_badnick)
self.irc.add_event_handler("nicknameinuse", self._handle_badnick)
self.irc.add_event_handler("nickcollision", self._handle_badnick)
self.irc.add_event_handler("unavailresource", self._handle_badnick)
self.irc.add_event_handler("featurelist", self._handle_features)
self.irc.add_event_handler("disconnect", self._handle_disconnect)
self.irc.add_event_handler("kick", self._handle_kick)
self.irc.add_event_handler("every_raw_message", self._handle_every_raw_message)
self.servers = {}
def thread_launch(self):
thread = threading.Thread(target=self.irc.spin)
Georg Brandl
committed
thread.setDaemon(True)
"PING arrived, bump the last-received time for the connection."
if connection.context:
connection.context.handle_ping()
"Welcome arrived, nick accepted for this connection."
if connection.context:
connection.context.handle_welcome()
if connection.context:
connection.context.handle_badnick()
def _handle_features(self, connection, event):
"Determine if and how we can set deaf mode."
if connection.context:
arguments = event.arguments
for lump in arguments:
if not self.logfile:
connection.mode(cxt.nickname(), "+"+lump[5:])
elif lump.startswith("MAXCHANNELS="):
m = int(lump[12:])
for pref in "#&+":
cxt.channel_limits[pref] = m
LOG.info("%s maxchannels is %d" % (connection.server, m))
elif lump.startswith("CHANLIMIT=#:"):
limits = lump[10:].split(",")
try:
for token in limits:
(prefixes, limit) = token.split(":")
limit = int(limit)
for c in prefixes:
cxt.channel_limits[c] = limit
LOG.info("%s channel limit map is %s" % (
connection.target, cxt.channel_limits))
LOG.error("ill-formed CHANLIMIT property")
def _handle_disconnect(self, connection, _event):
"Server hung up the connection."
LOG.info("server %s disconnected" % connection.target)
if connection.context:
connection.context.handle_disconnect()
def _handle_kick(self, connection, event):
"Server hung up the connection."
target = event.target
LOG.info("irker has been kicked from %s on %s" % (
target, connection.target))
connection.context.handle_kick(target)
def _handle_every_raw_message(self, _connection, event):
"Log all messages when in watcher mode."
if self.logfile:
with open(self.logfile, "a") as logfp:
(time.time(), event.source, event.arguments[0]))
def pending(self):
"Do we have any pending message traffic?"
return [k for (k, v) in self.servers.items() if v.pending()]
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
def _parse_request(self, line):
"Request-parsing helper for the handle() method"
request = json.loads(line.strip())
if not isinstance(request, dict):
raise InvalidRequest(
"request is not a JSON dictionary: %r" % request)
if "to" not in request or "privmsg" not in request:
raise InvalidRequest(
"malformed request - 'to' or 'privmsg' missing: %r" % request)
channels = request['to']
message = request['privmsg']
if not isinstance(channels, (list, basestring)):
raise InvalidRequest(
"malformed request - unexpected channel type: %r" % channels)
if not isinstance(message, basestring):
raise InvalidRequest(
"malformed request - unexpected message type: %r" % message)
if not isinstance(channels, list):
channels = [channels]
targets = []
for url in channels:
try:
if not isinstance(url, basestring):
raise InvalidRequest(
"malformed request - URL has unexpected type: %r" %
url)
target = Target(url)
target.validate()
except InvalidRequest as e:
else:
targets.append(target)
return (targets, message)
targets, message = self._parse_request(line=line)
for target in targets:
if target.server() not in self.servers:
self.servers[target.server()] = Dispatcher(
self, target=target, **self.kwargs)
self.servers[target.server()].dispatch(
target.channel, message, target.key, quit_after=quit_after)
# GC dispatchers with no active connections
servernames = self.servers.keys()
for servername in servernames:
if not self.servers[servername].live():
del self.servers[servername]
# If we might be pushing a resource limit even
# after garbage collection, remove a session. The
# goal here is to head off DoS attacks that aim at
# exhausting thread space or file descriptors.
# The cost is that attempts to DoS this service
# will cause lots of join/leave spam as we
# scavenge old channels after connecting to new
# ones. The particular method used for selecting a
# session to be terminated doesn't matter much; we
# choose the one longest idle on the assumption
# that message activity is likely to be clumpy.
if len(self.servers) >= CONNECTION_MAX:
oldest = min(
self.servers.keys(),
key=lambda name: self.servers[name].last_xmit())
del self.servers[oldest]
except InvalidRequest as e:
self.logerr("can't recognize JSON on input: %r" % line)
self.logerr("wildly malformed JSON blew the parser stack.")
class IrkerTCPHandler(socketserver.StreamRequestHandler):
line = self.rfile.readline()
if not line:
break
irker.handle(line.strip())
class IrkerUDPHandler(socketserver.BaseRequestHandler):
def handle(self):
data = self.request[0].strip()
#socket = self.request[1]
irker.handle(data)
parser = argparse.ArgumentParser(
description=__doc__.strip().splitlines()[0])
parser.add_argument(
'-d', '--log-level', metavar='LEVEL', choices=LOG_LEVELS,
help='file of trusted certificates for SSL/TLS')
parser.add_argument(
'-l', '--log-file', metavar='PATH',
help='file for saving captured message traffic')
parser.add_argument(
'-n', '--nick', metavar='NAME', default='irker%03d',
help="nickname (optionally with a '%%.*d' server connection marker)")
parser.add_argument(
'-p', '--password', metavar='PASSWORD',
help='NickServ password')
parser.add_argument(
'-i', '--immediate', action='store_const', const=True,
help='disconnect after sending each message')
parser.add_argument(
'-V', '--version', action='version',
version='%(prog)s {0}'.format(version))
args = parser.parse_args()
handler = logging.StreamHandler()
LOG.addHandler(handler)
if args.log_level:
log_level = getattr(logging, args.log_level.upper())
irker = Irker(
logfile=args.log_file,
nick_template=args.nick,
nick_needs_number=re.search('%.*d', args.nick),
password=args.password,
LOG.info("irkerd version %s" % version)
irker.irc.add_event_handler("quit", lambda _c, _e: sys.exit(0))
irker.handle('{"to":"%s","privmsg":"%s"}' % (immediate, arguments[0]), quit_after=True)
irker.thread_launch()
tcpserver = socketserver.TCPServer((HOST, PORT), IrkerTCPHandler)
udpserver = socketserver.UDPServer((HOST, PORT), IrkerUDPHandler)
for server in [tcpserver, udpserver]:
server = threading.Thread(target=server.serve_forever)
server.setDaemon(True)
server.start()
try:
signal.pause()
except KeyboardInterrupt:
raise SystemExit(1)
except socket.error as e:
LOG.error("server launch failed: %r\n" % e)