Commit 34c044ff authored by Benjamin Peterson's avatar Benjamin Peterson

remove old metaclass demos

parent 99352748
"""Support Eiffel-style preconditions and postconditions.
For example,
class C:
def m1(self, arg):
require arg > 0
return whatever
ensure Result > arg
can be written (clumsily, I agree) as:
class C(Eiffel):
def m1(self, arg):
return whatever
def m1_pre(self, arg):
assert arg > 0
def m1_post(self, Result, arg):
assert Result > arg
Pre- and post-conditions for a method, being implemented as methods
themselves, are inherited independently from the method. This gives
much of the same effect of Eiffel, where pre- and post-conditions are
inherited when a method is overridden by a derived class. However,
when a derived class in Python needs to extend a pre- or
post-condition, it must manually merge the base class' pre- or
post-condition with that defined in the derived class', for example:
class D(C):
def m1(self, arg):
return arg**2
def m1_post(self, Result, arg):
C.m1_post(self, Result, arg)
assert Result < 100
This gives derived classes more freedom but also more responsibility
than in Eiffel, where the compiler automatically takes care of this.
In Eiffel, pre-conditions combine using contravariance, meaning a
derived class can only make a pre-condition weaker; in Python, this is
up to the derived class. For example, a derived class that takes away
the requirement that arg > 0 could write:
def m1_pre(self, arg):
pass
but one could equally write a derived class that makes a stronger
requirement:
def m1_pre(self, arg):
require arg > 50
It would be easy to modify the classes shown here so that pre- and
post-conditions can be disabled (separately, on a per-class basis).
A different design would have the pre- or post-condition testing
functions return true for success and false for failure. This would
make it possible to implement automatic combination of inherited
and new pre-/post-conditions. All this is left as an exercise to the
reader.
"""
from Meta import MetaClass, MetaHelper, MetaMethodWrapper
class EiffelMethodWrapper(MetaMethodWrapper):
def __init__(self, func, inst):
MetaMethodWrapper.__init__(self, func, inst)
# Note that the following causes recursive wrappers around
# the pre-/post-condition testing methods. These are harmless
# but inefficient; to avoid them, the lookup must be done
# using the class.
try:
self.pre = getattr(inst, self.__name__ + "_pre")
except AttributeError:
self.pre = None
try:
self.post = getattr(inst, self.__name__ + "_post")
except AttributeError:
self.post = None
def __call__(self, *args, **kw):
if self.pre:
self.pre(*args, **kw)
Result = self.func(self.inst, *args, **kw)
if self.post:
self.post(Result, *args, **kw)
return Result
class EiffelHelper(MetaHelper):
__methodwrapper__ = EiffelMethodWrapper
class EiffelMetaClass(MetaClass):
__helper__ = EiffelHelper
Eiffel = EiffelMetaClass('Eiffel', (), {})
def _test():
class C(Eiffel):
def m1(self, arg):
return arg+1
def m1_pre(self, arg):
assert arg > 0, "precondition for m1 failed"
def m1_post(self, Result, arg):
assert Result > arg
x = C()
x.m1(12)
## x.m1(-1)
if __name__ == '__main__':
_test()
"""Enumeration metaclass.
XXX This is very much a work in progress.
"""
import string
class EnumMetaClass:
"""Metaclass for enumeration.
To define your own enumeration, do something like
class Color(Enum):
red = 1
green = 2
blue = 3
Now, Color.red, Color.green and Color.blue behave totally
different: they are enumerated values, not integers.
Enumerations cannot be instantiated; however they can be
subclassed.
"""
def __init__(self, name, bases, dict):
"""Constructor -- create an enumeration.
Called at the end of the class statement. The arguments are
the name of the new class, a tuple containing the base
classes, and a dictionary containing everything that was
entered in the class' namespace during execution of the class
statement. In the above example, it would be {'red': 1,
'green': 2, 'blue': 3}.
"""
for base in bases:
if base.__class__ is not EnumMetaClass:
raise TypeError("Enumeration base class must be enumeration")
bases = [x for x in bases if x is not Enum]
self.__name__ = name
self.__bases__ = bases
self.__dict = {}
for key, value in dict.items():
self.__dict[key] = EnumInstance(name, key, value)
def __getattr__(self, name):
"""Return an enumeration value.
For example, Color.red returns the value corresponding to red.
XXX Perhaps the values should be created in the constructor?
This looks in the class dictionary and if it is not found
there asks the base classes.
The special attribute __members__ returns the list of names
defined in this class (it does not merge in the names defined
in base classes).
"""
if name == '__members__':
return list(self.__dict.keys())
try:
return self.__dict[name]
except KeyError:
for base in self.__bases__:
try:
return getattr(base, name)
except AttributeError:
continue
raise AttributeError(name)
def __repr__(self):
s = self.__name__
if self.__bases__:
s = s + '(' + string.join([x.__name__ for x in self.__bases__], ", ") + ')'
if self.__dict:
list = []
for key, value in self.__dict.items():
list.append("%s: %s" % (key, int(value)))
s = "%s: {%s}" % (s, string.join(list, ", "))
return s
class EnumInstance:
"""Class to represent an enumeration value.
EnumInstance('Color', 'red', 12) prints as 'Color.red' and behaves
like the integer 12 when compared, but doesn't support arithmetic.
XXX Should it record the actual enumeration rather than just its
name?
"""
def __init__(self, classname, enumname, value):
self.__classname = classname
self.__enumname = enumname
self.__value = value
def __int__(self):
return self.__value
def __repr__(self):
return "EnumInstance(%r, %r, %r)" % (self.__classname,
self.__enumname,
self.__value)
def __str__(self):
return "%s.%s" % (self.__classname, self.__enumname)
def __cmp__(self, other):
return cmp(self.__value, int(other))
# Create the base class for enumerations.
# It is an empty enumeration.
Enum = EnumMetaClass("Enum", (), {})
def _test():
class Color(Enum):
red = 1
green = 2
blue = 3
print(Color.red)
print(dir(Color))
print(Color.red == Color.red)
print(Color.red == Color.blue)
print(Color.red == 1)
print(Color.red == 2)
class ExtendedColor(Color):
white = 0
orange = 4
yellow = 5
purple = 6
black = 7
print(ExtendedColor.orange)
print(ExtendedColor.red)
print(Color.red == ExtendedColor.red)
class OtherColor(Enum):
white = 4
blue = 5
class MergedColor(Color, OtherColor):
pass
print(MergedColor.red)
print(MergedColor.white)
print(Color)
print(ExtendedColor)
print(OtherColor)
print(MergedColor)
if __name__ == '__main__':
_test()
"""Generic metaclass.
XXX This is very much a work in progress.
"""
import types
class MetaMethodWrapper:
def __init__(self, func, inst):
self.func = func
self.inst = inst
self.__name__ = self.func.__name__
def __call__(self, *args, **kw):
return self.func(self.inst, *args, **kw)
class MetaHelper:
__methodwrapper__ = MetaMethodWrapper # For derived helpers to override
def __helperinit__(self, formalclass):
self.__formalclass__ = formalclass
def __getattr__(self, name):
# Invoked for any attr not in the instance's __dict__
try:
raw = self.__formalclass__.__getattr__(name)
except AttributeError:
try:
ga = self.__formalclass__.__getattr__('__usergetattr__')
except (KeyError, AttributeError):
raise AttributeError(name)
return ga(self, name)
if type(raw) != types.FunctionType:
return raw
return self.__methodwrapper__(raw, self)
class MetaClass:
"""A generic metaclass.
This can be subclassed to implement various kinds of meta-behavior.
"""
__helper__ = MetaHelper # For derived metaclasses to override
__inited = 0
def __init__(self, name, bases, dict):
try:
ga = dict['__getattr__']
except KeyError:
pass
else:
dict['__usergetattr__'] = ga
del dict['__getattr__']
self.__name__ = name
self.__bases__ = bases
self.__realdict__ = dict
self.__inited = 1
def __getattr__(self, name):
try:
return self.__realdict__[name]
except KeyError:
for base in self.__bases__:
try:
return base.__getattr__(name)
except AttributeError:
pass
raise AttributeError(name)
def __setattr__(self, name, value):
if not self.__inited:
self.__dict__[name] = value
else:
self.__realdict__[name] = value
def __call__(self, *args, **kw):
inst = self.__helper__()
inst.__helperinit__(self)
try:
init = inst.__getattr__('__init__')
except AttributeError:
init = lambda: None
init(*args, **kw)
return inst
Meta = MetaClass('Meta', (), {})
def _test():
class C(Meta):
def __init__(self, *args):
print("__init__, args =", args)
def m1(self, x):
print("m1(x=%r)" % (x,))
print(C)
x = C()
print(x)
x.m1(12)
class D(C):
def __getattr__(self, name):
if name[:2] == '__': raise AttributeError(name)
return "getattr:%s" % name
x = D()
print(x.foo)
print(x._foo)
## print x.__foo
## print x.__foo__
if __name__ == '__main__':
_test()
import types
class Tracing:
def __init__(self, name, bases, namespace):
"""Create a new class."""
self.__name__ = name
self.__bases__ = bases
self.__namespace__ = namespace
def __call__(self):
"""Create a new instance."""
return Instance(self)
class Instance:
def __init__(self, klass):
self.__klass__ = klass
def __getattr__(self, name):
try:
value = self.__klass__.__namespace__[name]
except KeyError:
raise AttributeError(name)
if type(value) is not types.FunctionType:
return value
return BoundMethod(value, self)
class BoundMethod:
def __init__(self, function, instance):
self.function = function
self.instance = instance
def __call__(self, *args):
print("calling", self.function, "for", self.instance, "with", args)
return self.function(self.instance, *args)
Trace = Tracing('Trace', (), {})
class MyTracedClass(Trace):
def method1(self, a):
self.a = a
def method2(self):
return self.a
aninstance = MyTracedClass()
aninstance.method1(10)
print(aninstance.method2())
"""Synchronization metaclass.
This metaclass makes it possible to declare synchronized methods.
"""
import _thread as thread
# First we need to define a reentrant lock.
# This is generally useful and should probably be in a standard Python
# library module. For now, we in-line it.
class Lock:
"""Reentrant lock.
This is a mutex-like object which can be acquired by the same
thread more than once. It keeps a reference count of the number
of times it has been acquired by the same thread. Each acquire()
call must be matched by a release() call and only the last
release() call actually releases the lock for acquisition by
another thread.
The implementation uses two locks internally:
__mutex is a short term lock used to protect the instance variables
__wait is the lock for which other threads wait
A thread intending to acquire both locks should acquire __wait
first.
The implementation uses two other instance variables, protected by
locking __mutex:
__tid is the thread ID of the thread that currently has the lock
__count is the number of times the current thread has acquired it
When the lock is released, __tid is None and __count is zero.
"""
def __init__(self):
"""Constructor. Initialize all instance variables."""
self.__mutex = thread.allocate_lock()
self.__wait = thread.allocate_lock()
self.__tid = None
self.__count = 0
def acquire(self, flag=1):
"""Acquire the lock.
If the optional flag argument is false, returns immediately
when it cannot acquire the __wait lock without blocking (it
may still block for a little while in order to acquire the
__mutex lock).
The return value is only relevant when the flag argument is
false; it is 1 if the lock is acquired, 0 if not.
"""
self.__mutex.acquire()
try:
if self.__tid == thread.get_ident():
self.__count = self.__count + 1
return 1
finally:
self.__mutex.release()
locked = self.__wait.acquire(flag)
if not flag and not locked:
return 0
try:
self.__mutex.acquire()
assert self.__tid == None
assert self.__count == 0
self.__tid = thread.get_ident()
self.__count = 1
return 1
finally:
self.__mutex.release()
def release(self):
"""Release the lock.
If this thread doesn't currently have the lock, an assertion
error is raised.
Only allow another thread to acquire the lock when the count
reaches zero after decrementing it.
"""
self.__mutex.acquire()
try:
assert self.__tid == thread.get_ident()
assert self.__count > 0
self.__count = self.__count - 1
if self.__count == 0:
self.__tid = None
self.__wait.release()
finally:
self.__mutex.release()
def _testLock():
done = []
def f2(lock, done=done):
lock.acquire()
print("f2 running in thread %d\n" % thread.get_ident(), end=' ')
lock.release()
done.append(1)
def f1(lock, f2=f2, done=done):
lock.acquire()
print("f1 running in thread %d\n" % thread.get_ident(), end=' ')
try:
f2(lock)
finally:
lock.release()
done.append(1)
lock = Lock()
lock.acquire()
f1(lock) # Adds 2 to done
lock.release()
lock.acquire()
thread.start_new_thread(f1, (lock,)) # Adds 2
thread.start_new_thread(f1, (lock, f1)) # Adds 3
thread.start_new_thread(f2, (lock,)) # Adds 1
thread.start_new_thread(f2, (lock,)) # Adds 1
lock.release()
import time
while len(done) < 9:
print(len(done))
time.sleep(0.001)
print(len(done))
# Now, the Locking metaclass is a piece of cake.
# As an example feature, methods whose name begins with exactly one
# underscore are not synchronized.
from Meta import MetaClass, MetaHelper, MetaMethodWrapper
class LockingMethodWrapper(MetaMethodWrapper):
def __call__(self, *args, **kw):
if self.__name__[:1] == '_' and self.__name__[1:] != '_':
return self.func(self.inst, *args, **kw)
self.inst.__lock__.acquire()
try:
return self.func(self.inst, *args, **kw)
finally:
self.inst.__lock__.release()
class LockingHelper(MetaHelper):
__methodwrapper__ = LockingMethodWrapper
def __helperinit__(self, formalclass):
MetaHelper.__helperinit__(self, formalclass)
self.__lock__ = Lock()
class LockingMetaClass(MetaClass):
__helper__ = LockingHelper
Locking = LockingMetaClass('Locking', (), {})
def _test():
# For kicks, take away the Locking base class and see it die
class Buffer(Locking):
def __init__(self, initialsize):
assert initialsize > 0
self.size = initialsize
self.buffer = [None]*self.size
self.first = self.last = 0
def put(self, item):
# Do we need to grow the buffer?
if (self.last+1) % self.size != self.first:
# Insert the new item
self.buffer[self.last] = item
self.last = (self.last+1) % self.size
return
# Double the buffer size
# First normalize it so that first==0 and last==size-1
print("buffer =", self.buffer)
print("first = %d, last = %d, size = %d" % (
self.first, self.last, self.size))
if self.first <= self.last:
temp = self.buffer[self.first:self.last]
else:
temp = self.buffer[self.first:] + self.buffer[:self.last]
print("temp =", temp)
self.buffer = temp + [None]*(self.size+1)
self.first = 0
self.last = self.size-1
self.size = self.size*2
print("Buffer size doubled to", self.size)
print("new buffer =", self.buffer)
print("first = %d, last = %d, size = %d" % (
self.first, self.last, self.size))
self.put(item) # Recursive call to test the locking
def get(self):
# Is the buffer empty?
if self.first == self.last:
raise EOFError # Avoid defining a new exception
item = self.buffer[self.first]
self.first = (self.first+1) % self.size
return item
def producer(buffer, wait, n=1000):
import time
i = 0
while i < n:
print("put", i)
buffer.put(i)
i = i+1
print("Producer: done producing", n, "items")
wait.release()
def consumer(buffer, wait, n=1000):
import time
i = 0
tout = 0.001
while i < n:
try:
x = buffer.get()
if x != i:
raise AssertionError("get() returned %s, expected %s" % (x, i))
print("got", i)
i = i+1
tout = 0.001
except EOFError:
time.sleep(tout)
tout = tout*2
print("Consumer: done consuming", n, "items")
wait.release()
pwait = thread.allocate_lock()
pwait.acquire()
cwait = thread.allocate_lock()
cwait.acquire()
buffer = Buffer(1)
n = 1000
thread.start_new_thread(consumer, (buffer, cwait, n))
thread.start_new_thread(producer, (buffer, pwait, n))
pwait.acquire()
print("Producer done")
cwait.acquire()
print("All done")
print("buffer size ==", len(buffer.buffer))
if __name__ == '__main__':
_testLock()
_test()
"""Tracing metaclass.
XXX This is very much a work in progress.
"""
import types, sys
class TraceMetaClass:
"""Metaclass for tracing.
Classes defined using this metaclass have an automatic tracing
feature -- by setting the __trace_output__ instance (or class)
variable to a file object, trace messages about all calls are
written to the file. The trace formatting can be changed by
defining a suitable __trace_call__ method.
"""
__inited = 0
def __init__(self, name, bases, dict):
self.__name__ = name
self.__bases__ = bases
self.__dict = dict
# XXX Can't define __dict__, alas
self.__inited = 1
def __getattr__(self, name):
try:
return self.__dict[name]
except KeyError:
for base in self.__bases__:
try:
return base.__getattr__(name)
except AttributeError:
pass
raise AttributeError(name)
def __setattr__(self, name, value):
if not self.__inited:
self.__dict__[name] = value
else:
self.__dict[name] = value
def __call__(self, *args, **kw):
inst = TracingInstance()
inst.__meta_init__(self)
try:
init = inst.__getattr__('__init__')
except AttributeError:
init = lambda: None
init(*args, **kw)
return inst
__trace_output__ = None
class TracingInstance:
"""Helper class to represent an instance of a tracing class."""
def __trace_call__(self, fp, fmt, *args):
fp.write((fmt+'\n') % args)
def __meta_init__(self, klass):
self.__class = klass
def __getattr__(self, name):
# Invoked for any attr not in the instance's __dict__
try:
raw = self.__class.__getattr__(name)
except AttributeError:
raise AttributeError(name)
if type(raw) != types.FunctionType:
return raw
# It's a function
fullname = self.__class.__name__ + "." + name
if not self.__trace_output__ or name == '__trace_call__':
return NotTracingWrapper(fullname, raw, self)
else:
return TracingWrapper(fullname, raw, self)
class NotTracingWrapper:
def __init__(self, name, func, inst):
self.__name__ = name
self.func = func
self.inst = inst
def __call__(self, *args, **kw):
return self.func(self.inst, *args, **kw)
class TracingWrapper(NotTracingWrapper):
def __call__(self, *args, **kw):
self.inst.__trace_call__(self.inst.__trace_output__,
"calling %s, inst=%s, args=%s, kw=%s",
self.__name__, self.inst, args, kw)
try:
rv = self.func(self.inst, *args, **kw)
except:
t, v, tb = sys.exc_info()
self.inst.__trace_call__(self.inst.__trace_output__,
"returning from %s with exception %s: %s",
self.__name__, t, v)
raise t(v).with_traceback(tb)
else:
self.inst.__trace_call__(self.inst.__trace_output__,
"returning from %s with value %s",
self.__name__, rv)
return rv
Traced = TraceMetaClass('Traced', (), {'__trace_output__': None})
def _test():
global C, D
class C(Traced):
def __init__(self, x=0): self.x = x
def m1(self, x): self.x = x
def m2(self, y): return self.x + y
__trace_output__ = sys.stdout
class D(C):
def m2(self, y): print("D.m2(%r)" % (y,)); return C.m2(self, y)
__trace_output__ = None
x = C(4321)
print(x)
print(x.x)
print(x.m1(100))
print(x.m1(10))
print(x.m2(33))
print(x.m1(5))
print(x.m2(4000))
print(x.x)
print(C.__init__)
print(C.m2)
print(D.__init__)
print(D.m2)
y = D()
print(y)
print(y.m1(10))
print(y.m2(100))
print(y.x)
if __name__ == '__main__':
_test()
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment