upnpigd.py 4.17 KB
Newer Older
1
from functools import wraps
2
import logging, random, socket, time
Guillaume Bury's avatar
Guillaume Bury committed
3 4
import miniupnpc

5

6 7 8 9
class UPnPException(Exception):
    pass


10
class Forwarder:
11 12 13
    """
    External port is chosen randomly between 32768 & 49151 included.
    """
14 15 16

    next_refresh = 0
    _next_retry = -1
17
    _lcg_n = 0
18 19

    @classmethod
20
    def _getExternalPort(cls) -> int:
21 22 23 24 25 26 27 28 29 30
        # Since _refresh() does not test all ports in a row, we prefer to
        # return random ports to maximize the chance to find a free port.
        # A linear congruential generator should be random enough, without
        # wasting memory/cpu by keeping a full 'shuffle'd list of integers.
        n = cls._lcg_n
        if not n:
            cls._lcg_a = 1 + 4 * random.randrange(0, 2048)
            cls._lcg_c = 1 + 2 * random.randrange(0, 4096)
        n = cls._lcg_n = (n * cls._lcg_a + cls._lcg_c) % 8192
        return 32768 + n
31 32 33

    def __init__(self, description):
        self._description = description
34 35
        self._u = miniupnpc.UPnP()
        self._u.discoverdelay = 200
Ulysse Beaugnon's avatar
Ulysse Beaugnon committed
36 37
        self._rules = []

38
    def __getattr__(self, name: str):
39 40
        wrapped = getattr(self._u, name)
        def wrapper(*args, **kw):
Julien Muchembled's avatar
Julien Muchembled committed
41
            try:
42
                return wrapped(*args, **kw)
43
            except Exception as e:
44 45 46
                raise UPnPException(str(e))
        return wraps(wrapped)(wrapper)

47
    def select(self, r, w, t):
48 49
        t.append((self.next_refresh, self.refresh))

50
    def checkExternalIp(self, ip=None):
51
        if not ip:
52
            ip = self.refresh()
53 54
        try:
            socket.inet_aton(ip)
55
        except (socket.error, TypeError):
56 57 58
            ip = ()
        return socket.AF_INET, ip and [(ip, str(port or local), proto)
            for local, proto, port in self._rules]
59 60 61

    def addRule(self, local_port, proto):
        self._rules.append([local_port, proto, None])
Guillaume Bury's avatar
Guillaume Bury committed
62

63
    def refresh(self):
64 65 66 67 68
        if self._next_retry:
            if time.time() < self._next_retry:
                return
            self._next_retry = 0
        else:
69
            try:
70
                return self._refresh()
71
            except UPnPException as e:
72
                logging.debug("UPnP failure", exc_info=True)
73 74 75 76 77
                self.clear()
        try:
            self.discover()
            self.selectigd()
            return self._refresh()
78
        except UPnPException as e:
79 80 81 82 83
            self.next_refresh = self._next_retry = time.time() + 60
            logging.info(str(e))
            self.clear()

    def _refresh(self):
84 85
        t = time.time()
        force = self.next_refresh < t
86
        if force:
87
            self.next_refresh = t + 500
88 89 90
            logging.debug('Refreshing port forwarding')
        ip = self.externalipaddress()
        lanaddr = self._u.lanaddr
91 92 93 94 95
        # It's too expensive (CPU/network) to try a full range every minute
        # when the router really has no free port. Or with slow routers, it
        # can take more than 15 minutes. So let's use some saner limits:
        t += 1
        retry = 15
96 97 98 99 100 101 102 103
        for r in self._rules:
            local, proto, port = r
            if port and not force:
                continue
            desc = '%s (%u/%s)' % (self._description, local, proto)
            args = proto.upper(), lanaddr, local, desc, ''
            while True:
                if port is None:
104
                    if not retry or t < time.time():
105 106
                        raise UPnPException('No free port to redirect %s'
                                            % desc)
107 108
                    retry -= 1
                    port = self._getExternalPort()
109 110 111
                try:
                    self.addportmapping(port, *args)
                    break
112
                except UPnPException as e:
113 114 115 116 117 118 119
                    if str(e) != 'ConflictInMappingEntry':
                        raise
                    port = None
            if r[2] != port:
                logging.debug('%s forwarded from %s:%u', desc, ip, port)
                r[2] = port
        return ip
Julien Muchembled's avatar
Julien Muchembled committed
120 121

    def clear(self):
122 123 124 125 126 127 128 129
        for r in self._rules:
            port = r[2]
            if port:
                r[2] = None
                try:
                    self.deleteportmapping(port, r[1].upper())
                except UPnPException:
                    pass