Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Router implementation #29

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
268 changes: 144 additions & 124 deletions tubes/routing.py
Original file line number Diff line number Diff line change
@@ -1,180 +1,200 @@
# -*- test-case-name: tubes.test.test_fan -*-
# -*- test-case-name: tubes.test.test_routing -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
A L{Router} receives items with addressing information and dispatches them to
an appropriate output, stripping the addressing information off.
from zope.interface import implementer

Use like so::
from .kit import Pauser, beginFlowingTo
from .itube import IDrain, IFount

from tubes.routing import Router, Routed, to

aRouter = Router(int)
@implementer(IFount)
class _RouteFount(object):
"""
The concrete fount type returned by L{Router.newRoute}.
"""
drain = None

evens, evenFount = aRouter.newRoute()
odds, oddFount = aRouter.newRoute()
outputType = None

@tube
class EvenOdd(object):
outputType = Routed(int)
def received(self, item):
if (item % 2) == 0:
yield to(evens, item)
else:
yield to(odds, item)
def __init__(self, upstreamPauser, stopper):
"""
@param upstreamPauser: A L{Pauser} which will pause the upstream fount
flowing into our L{Router}.

numbers.flowTo(aRouter)
@param stopper: A 0-argument callback to execute on
L{IFount.stopFlow}
"""
self._receivedWhilePaused = []
self._myPause = None
self._stopper = stopper

This creates a fount in evenFount and oddFount, which each have an outputType
of "int".
def actuallyPause():
self._myPause = upstreamPauser.pause()

Why do this rather than just having C{EvenOdd} just call methods directly based
on whether a number is even or odd?
def actuallyUnpause():
aPause = self._myPause
self._myPause = None
if self._receivedWhilePaused:
self.drain.receive(self._receivedWhilePaused.pop(0))
aPause.unpause()

By using a L{Router}, flow control relationships are automatically preserved by
the same mechanism that tubes usually use. The distinct drains of evenFount
and oddFount can both be independently paused, and the pause state will be
propagated to the "numbers" fount. If you want to send on outputs to multiple
drains which may have complex flow-control interrelationships, you can't do
that by calling the C{receive} method directly since any one of those methods
might reentrantly pause you.
"""
self._pauser = Pauser(actuallyPause, actuallyUnpause)

from .tube import tube, receiver
from .fan import Out

if 0:
from zope.interface.interfaces import IInterface
IInterface
def flowTo(self, drain):
"""
Flow to the given drain. Don't do anything special; just set up the
drain attribute and return the appropriate value.

@param drain: A drain to fan out values to.

@return: the result of C{drain.flowingFrom}
"""
return beginFlowingTo(self, drain)

class Routed(object):
"""
A L{Routed} is an interface describing another interface that has been
wrapped in a C{to}. As such, it is an incomplete implementation of
L{IInterface}.
"""

def __init__(self, interface=None):
def pauseFlow(self):
"""
Derive a L{Routed} version of C{interface}.
Pause the flow.

@param interface: the interface that will be provided by the C{what}
attribute of providers of this interface.
@type interface: L{IInterface}
@return: a pause
@rtype: L{IPause}
"""
self.interface = interface
return self._pauser.pause()


def isOrExtends(self, other):
def stopFlow(self):
"""
Is this L{Routed} substitutable for the given specification?

@param other: Another L{Routed} or interface.
@type other: L{IInterface}

@return: L{True} if so, L{False} if not.
Invoke the callback supplied to C{__init__} for stopping.
"""
if not isinstance(other, Routed):
return False
if self.interface is None or other.interface is None:
return True
return self.interface.isOrExtends(other.interface)
self._stopper(self)


def providedBy(self, instance):
def _deliverOne(self, item):
"""
Is this L{Routed} provided by a particular value?
Deliver one item to this fount's drain.

@param instance: an object which may or may not provide this interface.
@type instance: L{object}
This is only invoked when the upstream is unpaused.

@return: L{True} if so, L{False} if not.
@rtype: L{bool}
@param item: An item that the upstream would like to pass on.
"""
if not isinstance(instance, _To):
return False
if self.interface is None:
return True
return self.interface.providedBy(instance._what)



class _To(object):
if self.drain is None:
return
if self._myPause is not None:
self._receivedWhilePaused.append(item)
return
self.drain.receive(item)

@implementer(IDrain)
class _RouterDrain(object):
"""
An object destined for a specific destination.
An L{_RouterDrain} is the single L{IDrain} associated with a L{Router}.
"""

def __init__(self, where, what):
"""
Create a L{_To} to a particular route with a given value.

@param _where: see L{to}
fount = None

@param _what: see L{to}
def __init__(self, router):
"""
self._where = where
self._what = what
Construct a L{_RouterDrain}.

@param router: the router associated with this drain
@type founts: L{Router}
"""

self._router = router
self._pause = None
self._paused = False

def to(where, what):
"""
Construct a provider of L{Routed}C{(providedBy(where))}.
def _actuallyPause():
if self._paused:
raise NotImplementedError()
self._paused = True
if self.fount is not None:
self._pause = self.fount.pauseFlow()

@see: L{tubes.routing}
def _actuallyResume():
p = self._pause
self._pause = None
self._paused = False
if p is not None:
p.unpause()

@param where: A fount returned from L{Router.newRoute}. This must be
I{exactly} the return value of L{Router.newRoute}, as it is compared by
object identity and not by any feature of L{IFount}.
self._pauser = Pauser(_actuallyPause, _actuallyResume)

@param what: the value to deliver.
@property
def inputType(self):
"""
Implement the C{inputType} property by relaying it to the input type of
the drains.
"""
# TODO: prevent drains from different inputTypes from being added
for fount in self._router._founts:
if fount.drain is not None:
return fount.drain.inputType

@return: a L{Routed} object.
"""
return _To(where, what)

def flowingFrom(self, fount):
"""
The L{Router} associated with this L{_RouterDrain} is now receiving inputs
from the given fount.

@param fount: the new source of input for all drains attached to this
L{Router}.

@tube
class Router(object):
"""
A drain with multiple founts that consumes L{Routed}C{(IX)} from its input
and produces C{IX} to its outputs.
@return: L{None}, as this is a terminal drain.
"""
if self._paused:
p = self._pause
if fount is not None:
self._pause = fount.pauseFlow()
else:
self._pause = None
if p is not None:
p.unpause()
self.fount = fount

@ivar _out: A fan-out that consumes L{Routed}C{(X)} and produces C{X}.
@type _out: L{Out}
def receive(self, item):
"""
Deliver an item to the L{IDrain} attached to the L{RouteFount} via
C{Router().newRoute(...).flowTo(...)}.

@ivar drain: The input to this L{Router}.
@type drain: L{IDrain}
"""
@param item: any object
"""
destination = self._router._getItemDestination(item)
fount = self._router._routes[destination]
fount._deliverOne(item)

def __init__(self, outputType=None):
self._out = Out()
self._outputType = outputType
self.drain = self._out.drain
def flowStopped(self, reason):
for fount in self._router._founts[:]:
if fount.drain is not None:
fount.drain.flowStopped(reason)

class Router(object):

def newRoute(self):
def __init__(self, getItemDestination):
"""
Create a new route.
Create an L{Router}.

A route has two uses; first, it is an L{IFount} that you can flow to a
drain.
@param getItemDestination: the function that is called for each
received message and decodes it's destination address.
@type getItemDestination: function that takes one argument
and returns one value
"""
self._getItemDestination = getItemDestination
self._routes = {} # destination -> fount
self._founts = []
self.drain = _RouterDrain(self)

Second, it is the "where" parameter passed to L{to}. Each value sent
to L{Router.drain} should be a L{to} constructed with a value returned
from this method as the "where" parameter.
def newRoute(self, destination):
"""
Create a new L{IFount} whose drain will receive items from this
L{Router} if the message destination matches the one given.

@return: L{IFount}
@return: a fount associated with this L{Router}.
@rtype: L{IFount}.
"""
@receiver(inputType=Routed(self._outputType),
outputType=self._outputType)
def received(item):
if isinstance(item, to):
if item._where is fount:
yield item._what
fount = self._out.newFount().flowTo(received)
return fount
f = _RouteFount(self.drain._pauser, self._founts.remove)
self._founts.append(f)
self._routes[destination] = f
return f
67 changes: 67 additions & 0 deletions tubes/test/test_routing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# -*- test-case-name: tubes.test.test_routing -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Tests for L{tubes.routing}.
"""

from __future__ import print_function

from twisted.trial.unittest import SynchronousTestCase as TestCase

from ..tube import tube, series
from ..routing import Router

from ..test.util import (FakeFount, FakeDrain)


@tube
class IntStarter(object):
"""
A tube that yields an integer.
"""
def __init__(self, i):
self.i = i
def started(self):
"""
Yield an integer.
"""
yield self.i

def isEven(n):
if n % 2 == 0:
return True
else:
return False

class TestIntRouter(TestCase):
"""
Tests for L{Router}.
"""
def setUp(self):
self.ff = FakeFount()
self.evenDrain = FakeDrain()
self.oddDrain = FakeDrain()

self.router = Router(isEven)
self.oddFount = self.router.newRoute(False)
self.evenFount = self.router.newRoute(True)
self.oddFount.flowTo(self.oddDrain)
self.evenFount.flowTo(self.evenDrain)

def test_odd(self):
"""
Test that the router can successfully route odd numbers.
"""
self.ff.flowTo(series(IntStarter(667), self.router.drain))
self.assertEquals(self.oddDrain.received, [667])
self.assertEquals(self.evenDrain.received, [])

def test_even(self):
"""
Test that the router can successfully route even numbers.
"""
self.ff.flowTo(series(IntStarter(668), self.router.drain))
self.assertEquals(self.evenDrain.received, [668])
self.assertEquals(self.oddDrain.received, [])