Commit 56baca32 authored by Guido van Rossum's avatar Guido van Rossum

Removing DOS 8x3 support

parent f6fc8758
This diff is collapsed.
"""Bastionification utility.
A bastion (for another object -- the 'original') is an object that has
the same methods as the original but does not give access to its
instance variables. Bastions have a number of uses, but the most
obvious one is to provide code executing in restricted mode with a
safe interface to an object implemented in unrestricted mode.
The bastionification routine has an optional second argument which is
a filter function. Only those methods for which the filter method
(called with the method name as argument) returns true are accessible.
The default filter method returns true unless the method name begins
with an underscore.
There are a number of possible implementations of bastions. We use a
'lazy' approach where the bastion's __getattr__() discipline does all
the work for a particular method the first time it is used. This is
usually fastest, especially if the user doesn't call all available
methods. The retrieved methods are stored as instance variables of
the bastion, so the overhead is only occurred on the first use of each
method.
Detail: the bastion class has a __repr__() discipline which includes
the repr() of the original object. This is precomputed when the
bastion is created.
"""
from types import MethodType
class BastionClass:
"""Helper class used by the Bastion() function.
You could subclass this and pass the subclass as the bastionclass
argument to the Bastion() function, as long as the constructor has
the same signature (a get() function and a name for the object).
"""
def __init__(self, get, name):
"""Constructor.
Arguments:
get - a function that gets the attribute value (by name)
name - a human-readable name for the original object
(suggestion: use repr(object))
"""
self._get_ = get
self._name_ = name
def __repr__(self):
"""Return a representation string.
This includes the name passed in to the constructor, so that
if you print the bastion during debugging, at least you have
some idea of what it is.
"""
return "<Bastion for %s>" % self._name_
def __getattr__(self, name):
"""Get an as-yet undefined attribute value.
This calls the get() function that was passed to the
constructor. The result is stored as an instance variable so
that the next time the same attribute is requested,
__getattr__() won't be invoked.
If the get() function raises an exception, this is simply
passed on -- exceptions are not cached.
"""
attribute = self._get_(name)
self.__dict__[name] = attribute
return attribute
def Bastion(object, filter = lambda name: name[:1] != '_',
name=None, bastionclass=BastionClass):
"""Create a bastion for an object, using an optional filter.
See the Bastion module's documentation for background.
Arguments:
object - the original object
filter - a predicate that decides whether a function name is OK;
by default all names are OK that don't start with '_'
name - the name of the object; default repr(object)
bastionclass - class used to create the bastion; default BastionClass
"""
# Note: we define *two* ad-hoc functions here, get1 and get2.
# Both are intended to be called in the same way: get(name).
# It is clear that the real work (getting the attribute
# from the object and calling the filter) is done in get1.
# Why can't we pass get1 to the bastion? Because the user
# would be able to override the filter argument! With get2,
# overriding the default argument is no security loophole:
# all it does is call it.
# Also notice that we can't place the object and filter as
# instance variables on the bastion object itself, since
# the user has full access to all instance variables!
def get1(name, object=object, filter=filter):
"""Internal function for Bastion(). See source comments."""
if filter(name):
attribute = getattr(object, name)
if type(attribute) == MethodType:
return attribute
raise AttributeError, name
def get2(name, get1=get1):
"""Internal function for Bastion(). See source comments."""
return get1(name)
if name is None:
name = `object`
return bastionclass(get2, name)
def _test():
"""Test the Bastion() function."""
class Original:
def __init__(self):
self.sum = 0
def add(self, n):
self._add(n)
def _add(self, n):
self.sum = self.sum + n
def total(self):
return self.sum
o = Original()
b = Bastion(o)
testcode = """if 1:
b.add(81)
b.add(18)
print "b.total() =", b.total()
try:
print "b.sum =", b.sum,
except:
print "inaccessible"
else:
print "accessible"
try:
print "b._add =", b._add,
except:
print "inaccessible"
else:
print "accessible"
try:
print "b._get_.func_defaults =", b._get_.func_defaults,
except:
print "inaccessible"
else:
print "accessible"
\n"""
exec testcode
print '='*20, "Using rexec:", '='*20
import rexec
r = rexec.RExec()
m = r.add_module('__main__')
m.b = b
r.r_exec(testcode)
if __name__ == '__main__':
_test()
This diff is collapsed.
"""Module/script to "compile" all .py files to .pyc (or .pyo) file.
When called as a script with arguments, this compiles the directories
given as arguments recursively; the -l option prevents it from
recursing into directories.
Without arguments, if compiles all modules on sys.path, without
recursing into subdirectories. (Even though it should do so for
packages -- for now, you'll have to deal with packages separately.)
See module py_compile for details of the actual byte-compilation.
"""
import os
import stat
import sys
import py_compile
def compile_dir(dir, maxlevels=10, ddir=None, force=0):
"""Byte-compile all modules in the given directory tree.
Arguments (only dir is required):
dir: the directory to byte-compile
maxlevels: maximum recursion level (default 10)
ddir: if given, purported directory name (this is the
directory name that will show up in error messages)
force: if 1, force compilation, even if timestamps are up-to-date
"""
print 'Listing', dir, '...'
try:
names = os.listdir(dir)
except os.error:
print "Can't list", dir
names = []
names.sort()
success = 1
for name in names:
fullname = os.path.join(dir, name)
if ddir:
dfile = os.path.join(ddir, name)
else:
dfile = None
if os.path.isfile(fullname):
head, tail = name[:-3], name[-3:]
if tail == '.py':
cfile = fullname + (__debug__ and 'c' or 'o')
ftime = os.stat(fullname)[stat.ST_MTIME]
try: ctime = os.stat(cfile)[stat.ST_MTIME]
except os.error: ctime = 0
if (ctime > ftime) and not force: continue
print 'Compiling', fullname, '...'
try:
py_compile.compile(fullname, None, dfile)
except KeyboardInterrupt:
raise KeyboardInterrupt
except:
if type(sys.exc_type) == type(''):
exc_type_name = sys.exc_type
else: exc_type_name = sys.exc_type.__name__
print 'Sorry:', exc_type_name + ':',
print sys.exc_value
success = 0
elif maxlevels > 0 and \
name != os.curdir and name != os.pardir and \
os.path.isdir(fullname) and \
not os.path.islink(fullname):
compile_dir(fullname, maxlevels - 1, dfile, force)
return success
def compile_path(skip_curdir=1, maxlevels=0, force=0):
"""Byte-compile all module on sys.path.
Arguments (all optional):
skip_curdir: if true, skip current directory (default true)
maxlevels: max recursion level (default 0)
force: as for compile_dir() (default 0)
"""
success = 1
for dir in sys.path:
if (not dir or dir == os.curdir) and skip_curdir:
print 'Skipping current directory'
else:
success = success and compile_dir(dir, maxlevels, None, force)
return success
def main():
"""Script main program."""
import getopt
try:
opts, args = getopt.getopt(sys.argv[1:], 'lfd:')
except getopt.error, msg:
print msg
print "usage: compileall [-l] [-f] [-d destdir] [directory ...]"
print "-l: don't recurse down"
print "-f: force rebuild even if timestamps are up-to-date"
print "-d destdir: purported directory name for error messages"
print "if no directory arguments, -l sys.path is assumed"
sys.exit(2)
maxlevels = 10
ddir = None
force = 0
for o, a in opts:
if o == '-l': maxlevels = 0
if o == '-d': ddir = a
if o == '-f': force = 1
if ddir:
if len(args) != 1:
print "-d destdir require exactly one directory argument"
sys.exit(2)
success = 1
try:
if args:
for dir in args:
success = success and compile_dir(dir, maxlevels, ddir, force)
else:
success = compile_path()
except KeyboardInterrupt:
print "\n[interrupt]"
success = 0
return success
if __name__ == '__main__':
sys.exit(not main())
This diff is collapsed.
This diff is collapsed.
"""Helper class to quickly write a loop over all standard input files.
Typical use is:
import fileinput
for line in fileinput.input():
process(line)
This iterates over the lines of all files listed in sys.argv[1:],
defaulting to sys.stdin if the list is empty. If a filename is '-' it
is also replaced by sys.stdin. To specify an alternative list of
filenames, pass it as the argument to input(). A single file name is
also allowed.
Functions filename(), lineno() return the filename and cumulative line
number of the line that has just been read; filelineno() returns its
line number in the current file; isfirstline() returns true iff the
line just read is the first line of its file; isstdin() returns true
iff the line was read from sys.stdin. Function nextfile() closes the
current file so that the next iteration will read the first line from
the next file (if any); lines not read from the file will not count
towards the cumulative line count; the filename is not changed until
after the first line of the next file has been read. Function close()
closes the sequence.
Before any lines have been read, filename() returns None and both line
numbers are zero; nextfile() has no effect. After all lines have been
read, filename() and the line number functions return the values
pertaining to the last line read; nextfile() has no effect.
All files are opened in text mode. If an I/O error occurs during
opening or reading a file, the IOError exception is raised.
If sys.stdin is used more than once, the second and further use will
return no lines, except perhaps for interactive use, or if it has been
explicitly reset (e.g. using sys.stdin.seek(0)).
Empty files are opened and immediately closed; the only time their
presence in the list of filenames is noticeable at all is when the
last file opened is empty.
It is possible that the last line of a file doesn't end in a newline
character; otherwise lines are returned including the trailing
newline.
Class FileInput is the implementation; its methods filename(),
lineno(), fileline(), isfirstline(), isstdin(), nextfile() and close()
correspond to the functions in the module. In addition it has a
readline() method which returns the next input line, and a
__getitem__() method which implements the sequence behavior. The
sequence must be accessed in strictly sequential order; sequence
access and readline() cannot be mixed.
Optional in-place filtering: if the keyword argument inplace=1 is
passed to input() or to the FileInput constructor, the file is moved
to a backup file and standard output is directed to the input file.
This makes it possible to write a filter that rewrites its input file
in place. If the keyword argument backup=".<some extension>" is also
given, it specifies the extension for the backup file, and the backup
file remains around; by default, the extension is ".bak" and it is
deleted when the output file is closed. In-place filtering is
disabled when standard input is read. XXX The current implementation
does not work for MS-DOS 8+3 filesystems.
XXX Possible additions:
- optional getopt argument processing
- specify open mode ('r' or 'rb')
- specify buffer size
- fileno()
- isatty()
- read(), read(size), even readlines()
"""
import sys, os, stat
_state = None
def input(files=None, inplace=0, backup=""):
global _state
if _state and _state._file:
raise RuntimeError, "input() already active"
_state = FileInput(files, inplace, backup)
return _state
def close():
global _state
state = _state
_state = None
if state:
state.close()
def nextfile():
if not _state:
raise RuntimeError, "no active input()"
return _state.nextfile()
def filename():
if not _state:
raise RuntimeError, "no active input()"
return _state.filename()
def lineno():
if not _state:
raise RuntimeError, "no active input()"
return _state.lineno()
def filelineno():
if not _state:
raise RuntimeError, "no active input()"
return _state.filelineno()
def isfirstline():
if not _state:
raise RuntimeError, "no active input()"
return _state.isfirstline()
def isstdin():
if not _state:
raise RuntimeError, "no active input()"
return _state.isstdin()
class FileInput:
def __init__(self, files=None, inplace=0, backup=""):
if type(files) == type(''):
files = (files,)
else:
if files is None:
files = sys.argv[1:]
if not files:
files = ('-',)
else:
files = tuple(files)
self._files = files
self._inplace = inplace
self._backup = backup
self._savestdout = None
self._output = None
self._filename = None
self._lineno = 0
self._filelineno = 0
self._file = None
self._isstdin = 0
self._backupfilename = None
def __del__(self):
self.close()
def close(self):
self.nextfile()
self._files = ()
def __getitem__(self, i):
if i != self._lineno:
raise RuntimeError, "accessing lines out of order"
line = self.readline()
if not line:
raise IndexError, "end of input reached"
return line
def nextfile(self):
savestdout = self._savestdout
self._savestdout = 0
if savestdout:
sys.stdout = savestdout
output = self._output
self._output = 0
if output:
output.close()
file = self._file
self._file = 0
if file and not self._isstdin:
file.close()
backupfilename = self._backupfilename
self._backupfilename = 0
if backupfilename and not self._backup:
try: os.unlink(backupfilename)
except: pass
self._isstdin = 0
def readline(self):
if not self._file:
if not self._files:
return ""
self._filename = self._files[0]
self._files = self._files[1:]
self._filelineno = 0
self._file = None
self._isstdin = 0
self._backupfilename = 0
if self._filename == '-':
self._filename = '<stdin>'
self._file = sys.stdin
self._isstdin = 1
else:
if self._inplace:
self._backupfilename = (
self._filename + (self._backup or ".bak"))
try: os.unlink(self._backupfilename)
except os.error: pass
# The next few lines may raise IOError
os.rename(self._filename, self._backupfilename)
self._file = open(self._backupfilename, "r")
try:
perm = os.fstat(self._file.fileno())[stat.ST_MODE]
except:
self._output = open(self._filename, "w")
else:
fd = os.open(self._filename,
os.O_CREAT | os.O_WRONLY | os.O_TRUNC,
perm)
self._output = os.fdopen(fd, "w")
try:
os.chmod(self._filename, perm)
except:
pass
self._savestdout = sys.stdout
sys.stdout = self._output
else:
# This may raise IOError
self._file = open(self._filename, "r")
line = self._file.readline()
if line:
self._lineno = self._lineno + 1
self._filelineno = self._filelineno + 1
return line
self.nextfile()
# Recursive call
return self.readline()
def filename(self):
return self._filename
def lineno(self):
return self._lineno
def filelineno(self):
return self._filelineno
def isfirstline(self):
return self._filelineno == 1
def isstdin(self):
return self._isstdin
def _test():
import getopt
inplace = 0
backup = 0
opts, args = getopt.getopt(sys.argv[1:], "ib:")
for o, a in opts:
if o == '-i': inplace = 1
if o == '-b': backup = a
for line in input(args, inplace=inplace, backup=backup):
if line[-1:] == '\n': line = line[:-1]
if line[-1:] == '\r': line = line[:-1]
print "%d: %s[%d]%s %s" % (lineno(), filename(), filelineno(),
isfirstline() and "*" or "", line)
print "%d: %s[%d]" % (lineno(), filename(), filelineno())
if __name__ == '__main__':
_test()
This diff is collapsed.
"""Gopher protocol client interface."""
import string
# Default selector, host and port
DEF_SELECTOR = '1/'
DEF_HOST = 'gopher.micro.umn.edu'
DEF_PORT = 70
# Recognized file types
A_TEXT = '0'
A_MENU = '1'
A_CSO = '2'
A_ERROR = '3'
A_MACBINHEX = '4'
A_PCBINHEX = '5'
A_UUENCODED = '6'
A_INDEX = '7'
A_TELNET = '8'
A_BINARY = '9'
A_DUPLICATE = '+'
A_SOUND = 's'
A_EVENT = 'e'
A_CALENDAR = 'c'
A_HTML = 'h'
A_TN3270 = 'T'
A_MIME = 'M'
A_IMAGE = 'I'
A_WHOIS = 'w'
A_QUERY = 'q'
A_GIF = 'g'
A_HTML = 'h' # HTML file
A_WWW = 'w' # WWW address
A_PLUS_IMAGE = ':'
A_PLUS_MOVIE = ';'
A_PLUS_SOUND = '<'
_names = dir()
_type_to_name_map = {}
def type_to_name(gtype):
"""Map all file types to strings; unknown types become TYPE='x'."""
global _type_to_name_map
if _type_to_name_map=={}:
for name in _names:
if name[:2] == 'A_':
_type_to_name_map[eval(name)] = name[2:]
if _type_to_name_map.has_key(gtype):
return _type_to_name_map[gtype]
return 'TYPE=' + `gtype`
# Names for characters and strings
CRLF = '\r\n'
TAB = '\t'
def send_selector(selector, host, port = 0):
"""Send a selector to a given host and port, return a file with the reply."""
import socket
import string
if not port:
i = string.find(host, ':')
if i >= 0:
host, port = host[:i], string.atoi(host[i+1:])
if not port:
port = DEF_PORT
elif type(port) == type(''):
port = string.atoi(port)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((host, port))
s.send(selector + CRLF)
s.shutdown(1)
return s.makefile('rb')
def send_query(selector, query, host, port = 0):
"""Send a selector and a query string."""
return send_selector(selector + '\t' + query, host, port)
def path_to_selector(path):
"""Takes a path as returned by urlparse and returns the appropriate selector."""
if path=="/":
return "/"
else:
return path[2:] # Cuts initial slash and data type identifier
def path_to_datatype_name(path):
"""Takes a path as returned by urlparse and maps it to a string.
See section 3.4 of RFC 1738 for details."""
if path=="/":
# No way to tell, although "INDEX" is likely
return "TYPE='unknown'"
else:
return type_to_name(path[1])
# The following functions interpret the data returned by the gopher
# server according to the expected type, e.g. textfile or directory
def get_directory(f):
"""Get a directory in the form of a list of entries."""
import string
list = []
while 1:
line = f.readline()
if not line:
print '(Unexpected EOF from server)'
break
if line[-2:] == CRLF:
line = line[:-2]
elif line[-1:] in CRLF:
line = line[:-1]
if line == '.':
break
if not line:
print '(Empty line from server)'
continue
gtype = line[0]
parts = string.splitfields(line[1:], TAB)
if len(parts) < 4:
print '(Bad line from server:', `line`, ')'
continue
if len(parts) > 4:
if parts[4:] != ['+']:
print '(Extra info from server:',
print parts[4:], ')'
else:
parts.append('')
parts.insert(0, gtype)
list.append(parts)
return list
def get_textfile(f):
"""Get a text file as a list of lines, with trailing CRLF stripped."""
list = []
get_alt_textfile(f, list.append)
return list
def get_alt_textfile(f, func):
"""Get a text file and pass each line to a function, with trailing CRLF stripped."""
while 1:
line = f.readline()
if not line:
print '(Unexpected EOF from server)'
break
if line[-2:] == CRLF:
line = line[:-2]
elif line[-1:] in CRLF:
line = line[:-1]
if line == '.':
break
if line[:2] == '..':
line = line[1:]
func(line)
def get_binary(f):
"""Get a binary file as one solid data block."""
data = f.read()
return data
def get_alt_binary(f, func, blocksize):
"""Get a binary file and pass each block to a function."""
while 1:
data = f.read(blocksize)
if not data:
break
func(data)
def test():
"""Trivial test program."""
import sys
import getopt
opts, args = getopt.getopt(sys.argv[1:], '')
selector = DEF_SELECTOR
type = selector[0]
host = DEF_HOST
port = DEF_PORT
if args:
host = args[0]
args = args[1:]
if args:
type = args[0]
args = args[1:]
if len(type) > 1:
type, selector = type[0], type
else:
selector = ''
if args:
selector = args[0]
args = args[1:]
query = ''
if args:
query = args[0]
args = args[1:]
if type == A_INDEX:
f = send_query(selector, query, host)
else:
f = send_selector(selector, host)
if type == A_TEXT:
list = get_textfile(f)
for item in list: print item
elif type in (A_MENU, A_INDEX):
list = get_directory(f)
for item in list: print item
else:
data = get_binary(f)
print 'binary data:', len(data), 'bytes:', `data[:100]`[:40]
# Run the test when run as script
if __name__ == '__main__':
test()
This diff is collapsed.
"""Cache lines from files.
This is intended to read lines from modules imported -- hence if a filename
is not found, it will look down the module search path for a file by
that name.
"""
import sys
import os
from stat import *
def getline(filename, lineno):
lines = getlines(filename)
if 1 <= lineno <= len(lines):
return lines[lineno-1]
else:
return ''
# The cache
cache = {} # The cache
def clearcache():
"""Clear the cache entirely."""
global cache
cache = {}
def getlines(filename):
"""Get the lines for a file from the cache.
Update the cache if it doesn't contain an entry for this file already."""
if cache.has_key(filename):
return cache[filename][2]
else:
return updatecache(filename)
def checkcache():
"""Discard cache entries that are out of date.
(This is not checked upon each call!)"""
for filename in cache.keys():
size, mtime, lines, fullname = cache[filename]
try:
stat = os.stat(fullname)
except os.error:
del cache[filename]
continue
if size <> stat[ST_SIZE] or mtime <> stat[ST_MTIME]:
del cache[filename]
def updatecache(filename):
"""Update a cache entry and return its list of lines.
If something's wrong, print a message, discard the cache entry,
and return an empty list."""
if cache.has_key(filename):
del cache[filename]
if not filename or filename[0] + filename[-1] == '<>':
return []
fullname = filename
try:
stat = os.stat(fullname)
except os.error, msg:
# Try looking through the module search path
basename = os.path.split(filename)[1]
for dirname in sys.path:
fullname = os.path.join(dirname, basename)
try:
stat = os.stat(fullname)
break
except os.error:
pass
else:
# No luck
## print '*** Cannot stat', filename, ':', msg
return []
try:
fp = open(fullname, 'r')
lines = fp.readlines()
fp.close()
except IOError, msg:
## print '*** Cannot open', fullname, ':', msg
return []
size, mtime = stat[ST_SIZE], stat[ST_MTIME]
cache[filename] = size, mtime, lines, fullname
return lines
"""Macintosh-specific module for conversion between pathnames and URLs.
Do not import directly; use urllib instead."""
import string
import urllib
import os
def url2pathname(pathname):
"Convert /-delimited pathname to mac pathname"
#
# XXXX The .. handling should be fixed...
#
tp = urllib.splittype(pathname)[0]
if tp and tp <> 'file':
raise RuntimeError, 'Cannot convert non-local URL to pathname'
# Turn starting /// into /, an empty hostname means current host
if pathname[:3] == '///':
pathname = pathname[2:]
elif pathname[:2] == '//':
raise RuntimeError, 'Cannot convert non-local URL to pathname'
components = string.split(pathname, '/')
# Remove . and embedded ..
i = 0
while i < len(components):
if components[i] == '.':
del components[i]
elif components[i] == '..' and i > 0 and \
components[i-1] not in ('', '..'):
del components[i-1:i+1]
i = i-1
elif components[i] == '' and i > 0 and components[i-1] <> '':
del components[i]
else:
i = i+1
if not components[0]:
# Absolute unix path, don't start with colon
rv = string.join(components[1:], ':')
else:
# relative unix path, start with colon. First replace
# leading .. by empty strings (giving ::file)
i = 0
while i < len(components) and components[i] == '..':
components[i] = ''
i = i + 1
rv = ':' + string.join(components, ':')
# and finally unquote slashes and other funny characters
return urllib.unquote(rv)
def pathname2url(pathname):
"convert mac pathname to /-delimited pathname"
if '/' in pathname:
raise RuntimeError, "Cannot convert pathname containing slashes"
components = string.split(pathname, ':')
# Remove empty first and/or last component
if components[0] == '':
del components[0]
if components[-1] == '':
del components[-1]
# Replace empty string ('::') by .. (will result in '/../' later)
for i in range(len(components)):
if components[i] == '':
components[i] = '..'
# Truncate names longer than 31 bytes
components = map(_pncomp2url, components)
if os.path.isabs(pathname):
return '/' + string.join(components, '/')
else:
return string.join(components, '/')
def _pncomp2url(component):
component = urllib.quote(component[:31], safe='') # We want to quote slashes
return component
def test():
for url in ["index.html",
"bar/index.html",
"/foo/bar/index.html",
"/foo/bar/",
"/"]:
print `url`, '->', `url2pathname(url)`
for path in ["drive:",
"drive:dir:",
"drive:dir:file",
"drive:file",
"file",
":file",
":dir:",
":dir:file"]:
print `path`, '->', `pathname2url(path)`
if __name__ == '__main__':
test()
"""Various tools used by MIME-reading or MIME-writing programs."""
import os
import rfc822
import string
import tempfile
class Message(rfc822.Message):
"""A derived class of rfc822.Message that knows about MIME headers and
contains some hooks for decoding encoded and multipart messages."""
def __init__(self, fp, seekable = 1):
rfc822.Message.__init__(self, fp, seekable)
self.encodingheader = \
self.getheader('content-transfer-encoding')
self.typeheader = \
self.getheader('content-type')
self.parsetype()
self.parseplist()
def parsetype(self):
str = self.typeheader
if str == None:
str = 'text/plain'
if ';' in str:
i = string.index(str, ';')
self.plisttext = str[i:]
str = str[:i]
else:
self.plisttext = ''
fields = string.splitfields(str, '/')
for i in range(len(fields)):
fields[i] = string.lower(string.strip(fields[i]))
self.type = string.joinfields(fields, '/')
self.maintype = fields[0]
self.subtype = string.joinfields(fields[1:], '/')
def parseplist(self):
str = self.plisttext
self.plist = []
while str[:1] == ';':
str = str[1:]
if ';' in str:
# XXX Should parse quotes!
end = string.index(str, ';')
else:
end = len(str)
f = str[:end]
if '=' in f:
i = string.index(f, '=')
f = string.lower(string.strip(f[:i])) + \
'=' + string.strip(f[i+1:])
self.plist.append(string.strip(f))
str = str[end:]
def getplist(self):
return self.plist
def getparam(self, name):
name = string.lower(name) + '='
n = len(name)
for p in self.plist:
if p[:n] == name:
return rfc822.unquote(p[n:])
return None
def getparamnames(self):
result = []
for p in self.plist:
i = string.find(p, '=')
if i >= 0:
result.append(string.lower(p[:i]))
return result
def getencoding(self):
if self.encodingheader == None:
return '7bit'
return string.lower(self.encodingheader)
def gettype(self):
return self.type
def getmaintype(self):
return self.maintype
def getsubtype(self):
return self.subtype
# Utility functions
# -----------------
_prefix = None
def choose_boundary():
"""Return a random string usable as a multipart boundary.
The method used is so that it is *very* unlikely that the same
string of characters will every occur again in the Universe,
so the caller needn't check the data it is packing for the
occurrence of the boundary.
The boundary contains dots so you have to quote it in the header."""
global _prefix
import time
import random
if _prefix == None:
import socket
import os
hostid = socket.gethostbyname(socket.gethostname())
try:
uid = `os.getuid()`
except:
uid = '1'
try:
pid = `os.getpid()`
except:
pid = '1'
_prefix = hostid + '.' + uid + '.' + pid
timestamp = '%.3f' % time.time()
seed = `random.randint(0, 32767)`
return _prefix + '.' + timestamp + '.' + seed
# Subroutines for decoding some common content-transfer-types
def decode(input, output, encoding):
"""Decode common content-transfer-encodings (base64, quopri, uuencode)."""
if encoding == 'base64':
import base64
return base64.decode(input, output)
if encoding == 'quoted-printable':
import quopri
return quopri.decode(input, output)
if encoding in ('uuencode', 'x-uuencode', 'uue', 'x-uue'):
import uu
return uu.decode(input, output)
if encoding in ('7bit', '8bit'):
return output.write(input.read())
if decodetab.has_key(encoding):
pipethrough(input, decodetab[encoding], output)
else:
raise ValueError, \
'unknown Content-Transfer-Encoding: %s' % encoding
def encode(input, output, encoding):
"""Encode common content-transfer-encodings (base64, quopri, uuencode)."""
if encoding == 'base64':
import base64
return base64.encode(input, output)
if encoding == 'quoted-printable':
import quopri
return quopri.encode(input, output, 0)
if encoding in ('uuencode', 'x-uuencode', 'uue', 'x-uue'):
import uu
return uu.encode(input, output)
if encoding in ('7bit', '8bit'):
return output.write(input.read())
if encodetab.has_key(encoding):
pipethrough(input, encodetab[encoding], output)
else:
raise ValueError, \
'unknown Content-Transfer-Encoding: %s' % encoding
# The following is no longer used for standard encodings
# XXX This requires that uudecode and mmencode are in $PATH
uudecode_pipe = '''(
TEMP=/tmp/@uu.$$
sed "s%^begin [0-7][0-7]* .*%begin 600 $TEMP%" | uudecode
cat $TEMP
rm $TEMP
)'''
decodetab = {
'uuencode': uudecode_pipe,
'x-uuencode': uudecode_pipe,
'uue': uudecode_pipe,
'x-uue': uudecode_pipe,
'quoted-printable': 'mmencode -u -q',
'base64': 'mmencode -u -b',
}
encodetab = {
'x-uuencode': 'uuencode tempfile',
'uuencode': 'uuencode tempfile',
'x-uue': 'uuencode tempfile',
'uue': 'uuencode tempfile',
'quoted-printable': 'mmencode -q',
'base64': 'mmencode -b',
}
def pipeto(input, command):
pipe = os.popen(command, 'w')
copyliteral(input, pipe)
pipe.close()
def pipethrough(input, command, output):
tempname = tempfile.mktemp()
try:
temp = open(tempname, 'w')
except IOError:
print '*** Cannot create temp file', `tempname`
return
copyliteral(input, temp)
temp.close()
pipe = os.popen(command + ' <' + tempname, 'r')
copybinary(pipe, output)
pipe.close()
os.unlink(tempname)
def copyliteral(input, output):
while 1:
line = input.readline()
if not line: break
output.write(line)
def copybinary(input, output):
BUFSIZE = 8192
while 1:
line = input.read(BUFSIZE)
if not line: break
output.write(line)
"""Guess the MIME type of a file.
This module defines two useful functions:
guess_type(url) -- guess the MIME type and encoding of a URL.
guess_extension(type) -- guess the extension for a given MIME type.
It also contains the following, for tuning the behavior:
Data:
knownfiles -- list of files to parse
inited -- flag set when init() has been called
suffixes_map -- dictionary mapping suffixes to suffixes
encodings_map -- dictionary mapping suffixes to encodings
types_map -- dictionary mapping suffixes to types
Functions:
init([files]) -- parse a list of files, default knownfiles
read_mime_types(file) -- parse one file, return a dictionary or None
"""
import string
import posixpath
import urllib
knownfiles = [
"/usr/local/etc/httpd/conf/mime.types",
"/usr/local/lib/netscape/mime.types",
"/usr/local/etc/httpd/conf/mime.types", # Apache 1.2
"/usr/local/etc/mime.types", # Apache 1.3
]
inited = 0
def guess_type(url):
"""Guess the type of a file based on its URL.
Return value is a tuple (type, encoding) where type is None if the
type can't be guessed (no or unknown suffix) or a string of the
form type/subtype, usable for a MIME Content-type header; and
encoding is None for no encoding or the name of the program used
to encode (e.g. compress or gzip). The mappings are table
driven. Encoding suffixes are case sensitive; type suffixes are
first tried case sensitive, then case insensitive.
The suffixes .tgz, .taz and .tz (case sensitive!) are all mapped
to ".tar.gz". (This is table-driven too, using the dictionary
suffix_map).
"""
if not inited:
init()
scheme, url = urllib.splittype(url)
if scheme == 'data':
# syntax of data URLs:
# dataurl := "data:" [ mediatype ] [ ";base64" ] "," data
# mediatype := [ type "/" subtype ] *( ";" parameter )
# data := *urlchar
# parameter := attribute "=" value
# type/subtype defaults to "text/plain"
comma = string.find(url, ',')
if comma < 0:
# bad data URL
return None, None
semi = string.find(url, ';', 0, comma)
if semi >= 0:
type = url[:semi]
else:
type = url[:comma]
if '=' in type or '/' not in type:
type = 'text/plain'
return type, None # never compressed, so encoding is None
base, ext = posixpath.splitext(url)
while suffix_map.has_key(ext):
base, ext = posixpath.splitext(base + suffix_map[ext])
if encodings_map.has_key(ext):
encoding = encodings_map[ext]
base, ext = posixpath.splitext(base)
else:
encoding = None
if types_map.has_key(ext):
return types_map[ext], encoding
elif types_map.has_key(string.lower(ext)):
return types_map[string.lower(ext)], encoding
else:
return None, encoding
def guess_extension(type):
"""Guess the extension for a file based on its MIME type.
Return value is a string giving a filename extension, including the
leading dot ('.'). The extension is not guaranteed to have been
associated with any particular data stream, but would be mapped to the
MIME type `type' by guess_type(). If no extension can be guessed for
`type', None is returned.
"""
global inited
if not inited:
init()
type = string.lower(type)
for ext, stype in types_map.items():
if type == stype:
return ext
return None
def init(files=None):
global inited
for file in files or knownfiles:
s = read_mime_types(file)
if s:
for key, value in s.items():
types_map[key] = value
inited = 1
def read_mime_types(file):
try:
f = open(file)
except IOError:
return None
map = {}
while 1:
line = f.readline()
if not line: break
words = string.split(line)
for i in range(len(words)):
if words[i][0] == '#':
del words[i:]
break
if not words: continue
type, suffixes = words[0], words[1:]
for suff in suffixes:
map['.'+suff] = type
f.close()
return map
suffix_map = {
'.tgz': '.tar.gz',
'.taz': '.tar.gz',
'.tz': '.tar.gz',
}
encodings_map = {
'.gz': 'gzip',
'.Z': 'compress',
}
types_map = {
'.a': 'application/octet-stream',
'.ai': 'application/postscript',
'.aif': 'audio/x-aiff',
'.aifc': 'audio/x-aiff',
'.aiff': 'audio/x-aiff',
'.au': 'audio/basic',
'.avi': 'video/x-msvideo',
'.bcpio': 'application/x-bcpio',
'.bin': 'application/octet-stream',
'.cdf': 'application/x-netcdf',
'.cpio': 'application/x-cpio',
'.csh': 'application/x-csh',
'.dll': 'application/octet-stream',
'.dvi': 'application/x-dvi',
'.exe': 'application/octet-stream',
'.eps': 'application/postscript',
'.etx': 'text/x-setext',
'.gif': 'image/gif',
'.gtar': 'application/x-gtar',
'.hdf': 'application/x-hdf',
'.htm': 'text/html',
'.html': 'text/html',
'.ief': 'image/ief',
'.jpe': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.jpg': 'image/jpeg',
'.js': 'application/x-javascript',
'.latex': 'application/x-latex',
'.man': 'application/x-troff-man',
'.me': 'application/x-troff-me',
'.mif': 'application/x-mif',
'.mov': 'video/quicktime',
'.movie': 'video/x-sgi-movie',
'.mpe': 'video/mpeg',
'.mpeg': 'video/mpeg',
'.mpg': 'video/mpeg',
'.ms': 'application/x-troff-ms',
'.nc': 'application/x-netcdf',
'.o': 'application/octet-stream',
'.obj': 'application/octet-stream',
'.oda': 'application/oda',
'.pbm': 'image/x-portable-bitmap',
'.pdf': 'application/pdf',
'.pgm': 'image/x-portable-graymap',
'.pnm': 'image/x-portable-anymap',
'.png': 'image/png',
'.ppm': 'image/x-portable-pixmap',
'.py': 'text/x-python',
'.pyc': 'application/x-python-code',
'.ps': 'application/postscript',
'.qt': 'video/quicktime',
'.ras': 'image/x-cmu-raster',
'.rgb': 'image/x-rgb',
'.rdf': 'application/xml',
'.roff': 'application/x-troff',
'.rtf': 'application/rtf',
'.rtx': 'text/richtext',
'.sgm': 'text/x-sgml',
'.sgml': 'text/x-sgml',
'.sh': 'application/x-sh',
'.shar': 'application/x-shar',
'.snd': 'audio/basic',
'.so': 'application/octet-stream',
'.src': 'application/x-wais-source',
'.sv4cpio': 'application/x-sv4cpio',
'.sv4crc': 'application/x-sv4crc',
'.t': 'application/x-troff',
'.tar': 'application/x-tar',
'.tcl': 'application/x-tcl',
'.tex': 'application/x-tex',
'.texi': 'application/x-texinfo',
'.texinfo': 'application/x-texinfo',
'.tif': 'image/tiff',
'.tiff': 'image/tiff',
'.tr': 'application/x-troff',
'.tsv': 'text/tab-separated-values',
'.txt': 'text/plain',
'.ustar': 'application/x-ustar',
'.wav': 'audio/x-wav',
'.xbm': 'image/x-xbitmap',
'.xml': 'text/xml',
'.xsl': 'application/xml',
'.xpm': 'image/x-xpixmap',
'.xwd': 'image/x-xwindowdump',
'.zip': 'application/zip',
}
"""Generic MIME writer.
Classes:
MimeWriter - the only thing here.
"""
import string
import mimetools
class MimeWriter:
"""Generic MIME writer.
Methods:
__init__()
addheader()
flushheaders()
startbody()
startmultipartbody()
nextpart()
lastpart()
A MIME writer is much more primitive than a MIME parser. It
doesn't seek around on the output file, and it doesn't use large
amounts of buffer space, so you have to write the parts in the
order they should occur on the output file. It does buffer the
headers you add, allowing you to rearrange their order.
General usage is:
f = <open the output file>
w = MimeWriter(f)
...call w.addheader(key, value) 0 or more times...
followed by either:
f = w.startbody(content_type)
...call f.write(data) for body data...
or:
w.startmultipartbody(subtype)
for each part:
subwriter = w.nextpart()
...use the subwriter's methods to create the subpart...
w.lastpart()
The subwriter is another MimeWriter instance, and should be
treated in the same way as the toplevel MimeWriter. This way,
writing recursive body parts is easy.
Warning: don't forget to call lastpart()!
XXX There should be more state so calls made in the wrong order
are detected.
Some special cases:
- startbody() just returns the file passed to the constructor;
but don't use this knowledge, as it may be changed.
- startmultipartbody() actually returns a file as well;
this can be used to write the initial 'if you can read this your
mailer is not MIME-aware' message.
- If you call flushheaders(), the headers accumulated so far are
written out (and forgotten); this is useful if you don't need a
body part at all, e.g. for a subpart of type message/rfc822
that's (mis)used to store some header-like information.
- Passing a keyword argument 'prefix=<flag>' to addheader(),
start*body() affects where the header is inserted; 0 means
append at the end, 1 means insert at the start; default is
append for addheader(), but insert for start*body(), which use
it to determine where the Content-Type header goes.
"""
def __init__(self, fp):
self._fp = fp
self._headers = []
def addheader(self, key, value, prefix=0):
lines = string.splitfields(value, "\n")
while lines and not lines[-1]: del lines[-1]
while lines and not lines[0]: del lines[0]
for i in range(1, len(lines)):
lines[i] = " " + string.strip(lines[i])
value = string.joinfields(lines, "\n") + "\n"
line = key + ": " + value
if prefix:
self._headers.insert(0, line)
else:
self._headers.append(line)
def flushheaders(self):
self._fp.writelines(self._headers)
self._headers = []
def startbody(self, ctype, plist=[], prefix=1):
for name, value in plist:
ctype = ctype + ';\n %s=\"%s\"' % (name, value)
self.addheader("Content-Type", ctype, prefix=prefix)
self.flushheaders()
self._fp.write("\n")
return self._fp
def startmultipartbody(self, subtype, boundary=None, plist=[], prefix=1):
self._boundary = boundary or mimetools.choose_boundary()
return self.startbody("multipart/" + subtype,
[("boundary", self._boundary)] + plist,
prefix=prefix)
def nextpart(self):
self._fp.write("\n--" + self._boundary + "\n")
return self.__class__(self._fp)
def lastpart(self):
self._fp.write("\n--" + self._boundary + "--\n")
if __name__ == '__main__':
import test.test_MimeWriter
"""A readline()-style interface to the parts of a multipart message.
The MultiFile class makes each part of a multipart message "feel" like
an ordinary file, as long as you use fp.readline(). Allows recursive
use, for nested multipart messages. Probably best used together
with module mimetools.
Suggested use:
real_fp = open(...)
fp = MultiFile(real_fp)
"read some lines from fp"
fp.push(separator)
while 1:
"read lines from fp until it returns an empty string" (A)
if not fp.next(): break
fp.pop()
"read remaining lines from fp until it returns an empty string"
The latter sequence may be used recursively at (A).
It is also allowed to use multiple push()...pop() sequences.
If seekable is given as 0, the class code will not do the bookkeeping
it normally attempts in order to make seeks relative to the beginning of the
current file part. This may be useful when using MultiFile with a non-
seekable stream object.
"""
import sys
import string
class Error(Exception):
pass
class MultiFile:
seekable = 0
def __init__(self, fp, seekable=1):
self.fp = fp
self.stack = [] # Grows down
self.level = 0
self.last = 0
if seekable:
self.seekable = 1
self.start = self.fp.tell()
self.posstack = [] # Grows down
def tell(self):
if self.level > 0:
return self.lastpos
return self.fp.tell() - self.start
def seek(self, pos, whence=0):
here = self.tell()
if whence:
if whence == 1:
pos = pos + here
elif whence == 2:
if self.level > 0:
pos = pos + self.lastpos
else:
raise Error, "can't use whence=2 yet"
if not 0 <= pos <= here or \
self.level > 0 and pos > self.lastpos:
raise Error, 'bad MultiFile.seek() call'
self.fp.seek(pos + self.start)
self.level = 0
self.last = 0
def readline(self):
if self.level > 0:
return ''
line = self.fp.readline()
# Real EOF?
if not line:
self.level = len(self.stack)
self.last = (self.level > 0)
if self.last:
raise Error, 'sudden EOF in MultiFile.readline()'
return ''
assert self.level == 0
# Fast check to see if this is just data
if self.is_data(line):
return line
else:
# Ignore trailing whitespace on marker lines
k = len(line) - 1;
while line[k] in string.whitespace:
k = k - 1
marker = line[:k+1]
# No? OK, try to match a boundary.
# Return the line (unstripped) if we don't.
for i in range(len(self.stack)):
sep = self.stack[i]
if marker == self.section_divider(sep):
self.last = 0
break
elif marker == self.end_marker(sep):
self.last = 1
break
else:
return line
# We only get here if we see a section divider or EOM line
if self.seekable:
self.lastpos = self.tell() - len(line)
self.level = i+1
if self.level > 1:
raise Error,'Missing endmarker in MultiFile.readline()'
return ''
def readlines(self):
list = []
while 1:
line = self.readline()
if not line: break
list.append(line)
return list
def read(self): # Note: no size argument -- read until EOF only!
return string.joinfields(self.readlines(), '')
def next(self):
while self.readline(): pass
if self.level > 1 or self.last:
return 0
self.level = 0
self.last = 0
if self.seekable:
self.start = self.fp.tell()
return 1
def push(self, sep):
if self.level > 0:
raise Error, 'bad MultiFile.push() call'
self.stack.insert(0, sep)
if self.seekable:
self.posstack.insert(0, self.start)
self.start = self.fp.tell()
def pop(self):
if self.stack == []:
raise Error, 'bad MultiFile.pop() call'
if self.level <= 1:
self.last = 0
else:
abslastpos = self.lastpos + self.start
self.level = max(0, self.level - 1)
del self.stack[0]
if self.seekable:
self.start = self.posstack[0]
del self.posstack[0]
if self.level > 0:
self.lastpos = abslastpos - self.start
def is_data(self, line):
return line[:2] <> '--'
def section_divider(self, str):
return "--" + str
def end_marker(self, str):
return "--" + str + "--"
"""Convert a NT pathname to a file URL and vice versa."""
def url2pathname(url):
r"""Convert a URL to a DOS path.
///C|/foo/bar/spam.foo
becomes
C:\foo\bar\spam.foo
"""
import string, urllib
if not '|' in url:
# No drive specifier, just convert slashes
if url[:4] == '////':
# path is something like ////host/path/on/remote/host
# convert this to \\host\path\on\remote\host
# (notice halving of slashes at the start of the path)
url = url[2:]
components = string.split(url, '/')
# make sure not to convert quoted slashes :-)
return urllib.unquote(string.join(components, '\\'))
comp = string.split(url, '|')
if len(comp) != 2 or comp[0][-1] not in string.letters:
error = 'Bad URL: ' + url
raise IOError, error
drive = string.upper(comp[0][-1])
components = string.split(comp[1], '/')
path = drive + ':'
for comp in components:
if comp:
path = path + '\\' + urllib.unquote(comp)
return path
def pathname2url(p):
r"""Convert a DOS path name to a file url.
C:\foo\bar\spam.foo
becomes
///C|/foo/bar/spam.foo
"""
import string, urllib
if not ':' in p:
# No drive specifier, just convert slashes and quote the name
if p[:2] == '\\\\':
# path is something like \\host\path\on\remote\host
# convert this to ////host/path/on/remote/host
# (notice doubling of slashes at the start of the path)
p = '\\\\' + p
components = string.split(p, '\\')
return urllib.quote(string.join(components, '/'))
comp = string.split(p, ':')
if len(comp) != 2 or len(comp[0]) > 1:
error = 'Bad path: ' + p
raise IOError, error
drive = urllib.quote(string.upper(comp[0]))
components = string.split(comp[1], '\\')
path = '///' + drive + '|'
for comp in components:
if comp:
path = path + '/' + urllib.quote(comp)
return path
"""Extended file operations available in POSIX.
f = posixfile.open(filename, [mode, [bufsize]])
will create a new posixfile object
f = posixfile.fileopen(fileobject)
will create a posixfile object from a builtin file object
f.file()
will return the original builtin file object
f.dup()
will return a new file object based on a new filedescriptor
f.dup2(fd)
will return a new file object based on the given filedescriptor
f.flags(mode)
will turn on the associated flag (merge)
mode can contain the following characters:
(character representing a flag)
a append only flag
c close on exec flag
n no delay flag
s synchronization flag
(modifiers)
! turn flags 'off' instead of default 'on'
= copy flags 'as is' instead of default 'merge'
? return a string in which the characters represent the flags
that are set
note: - the '!' and '=' modifiers are mutually exclusive.
- the '?' modifier will return the status of the flags after they
have been changed by other characters in the mode string
f.lock(mode [, len [, start [, whence]]])
will (un)lock a region
mode can contain the following characters:
(character representing type of lock)
u unlock
r read lock
w write lock
(modifiers)
| wait until the lock can be granted
? return the first lock conflicting with the requested lock
or 'None' if there is no conflict. The lock returned is in the
format (mode, len, start, whence, pid) where mode is a
character representing the type of lock ('r' or 'w')
note: - the '?' modifier prevents a region from being locked; it is
query only
"""
class _posixfile_:
"""File wrapper class that provides extra POSIX file routines."""
states = ['open', 'closed']
#
# Internal routines
#
def __repr__(self):
file = self._file_
return "<%s posixfile '%s', mode '%s' at %s>" % \
(self.states[file.closed], file.name, file.mode, \
hex(id(self))[2:])
#
# Initialization routines
#
def open(self, name, mode='r', bufsize=-1):
import __builtin__
return self.fileopen(__builtin__.open(name, mode, bufsize))
def fileopen(self, file):
if `type(file)` != "<type 'file'>":
raise TypeError, 'posixfile.fileopen() arg must be file object'
self._file_ = file
# Copy basic file methods
for method in file.__methods__:
setattr(self, method, getattr(file, method))
return self
#
# New methods
#
def file(self):
return self._file_
def dup(self):
import posix
try: ignore = posix.fdopen
except: raise AttributeError, 'dup() method unavailable'
return posix.fdopen(posix.dup(self._file_.fileno()), self._file_.mode)
def dup2(self, fd):
import posix
try: ignore = posix.fdopen
except: raise AttributeError, 'dup() method unavailable'
posix.dup2(self._file_.fileno(), fd)
return posix.fdopen(fd, self._file_.mode)
def flags(self, *which):
import fcntl, FCNTL
if which:
if len(which) > 1:
raise TypeError, 'Too many arguments'
which = which[0]
else: which = '?'
l_flags = 0
if 'n' in which: l_flags = l_flags | FCNTL.O_NDELAY
if 'a' in which: l_flags = l_flags | FCNTL.O_APPEND
if 's' in which: l_flags = l_flags | FCNTL.O_SYNC
file = self._file_
if '=' not in which:
cur_fl = fcntl.fcntl(file.fileno(), FCNTL.F_GETFL, 0)
if '!' in which: l_flags = cur_fl & ~ l_flags
else: l_flags = cur_fl | l_flags
l_flags = fcntl.fcntl(file.fileno(), FCNTL.F_SETFL, l_flags)
if 'c' in which:
arg = ('!' not in which) # 0 is don't, 1 is do close on exec
l_flags = fcntl.fcntl(file.fileno(), FCNTL.F_SETFD, arg)
if '?' in which:
which = '' # Return current flags
l_flags = fcntl.fcntl(file.fileno(), FCNTL.F_GETFL, 0)
if FCNTL.O_APPEND & l_flags: which = which + 'a'
if fcntl.fcntl(file.fileno(), FCNTL.F_GETFD, 0) & 1:
which = which + 'c'
if FCNTL.O_NDELAY & l_flags: which = which + 'n'
if FCNTL.O_SYNC & l_flags: which = which + 's'
return which
def lock(self, how, *args):
import struct, fcntl, FCNTL
if 'w' in how: l_type = FCNTL.F_WRLCK
elif 'r' in how: l_type = FCNTL.F_RDLCK
elif 'u' in how: l_type = FCNTL.F_UNLCK
else: raise TypeError, 'no type of lock specified'
if '|' in how: cmd = FCNTL.F_SETLKW
elif '?' in how: cmd = FCNTL.F_GETLK
else: cmd = FCNTL.F_SETLK
l_whence = 0
l_start = 0
l_len = 0
if len(args) == 1:
l_len = args[0]
elif len(args) == 2:
l_len, l_start = args
elif len(args) == 3:
l_len, l_start, l_whence = args
elif len(args) > 3:
raise TypeError, 'too many arguments'
# Hack by davem@magnet.com to get locking to go on freebsd;
# additions for AIX by Vladimir.Marangozov@imag.fr
import sys, os
if sys.platform in ('netbsd1',
'openbsd2',
'freebsd2', 'freebsd3', 'freebsd4', 'freebsd5',
'bsdos2', 'bsdos3', 'bsdos4'):
flock = struct.pack('lxxxxlxxxxlhh', \
l_start, l_len, os.getpid(), l_type, l_whence)
elif sys.platform in ['aix3', 'aix4']:
flock = struct.pack('hhlllii', \
l_type, l_whence, l_start, l_len, 0, 0, 0)
else:
flock = struct.pack('hhllhh', \
l_type, l_whence, l_start, l_len, 0, 0)
flock = fcntl.fcntl(self._file_.fileno(), cmd, flock)
if '?' in how:
if sys.platform in ('netbsd1',
'openbsd2',
'freebsd2', 'freebsd3', 'freebsd4', 'freebsd5',
'bsdos2', 'bsdos3', 'bsdos4'):
l_start, l_len, l_pid, l_type, l_whence = \
struct.unpack('lxxxxlxxxxlhh', flock)
elif sys.platform in ['aix3', 'aix4']:
l_type, l_whence, l_start, l_len, l_sysid, l_pid, l_vfs = \
struct.unpack('hhlllii', flock)
elif sys.platform == "linux2":
l_type, l_whence, l_start, l_len, l_pid, l_sysid = \
struct.unpack('hhllhh', flock)
else:
l_type, l_whence, l_start, l_len, l_sysid, l_pid = \
struct.unpack('hhllhh', flock)
if l_type != FCNTL.F_UNLCK:
if l_type == FCNTL.F_RDLCK:
return 'r', l_len, l_start, l_whence, l_pid
else:
return 'w', l_len, l_start, l_whence, l_pid
def open(name, mode='r', bufsize=-1):
"""Public routine to open a file as a posixfile object."""
return _posixfile_().open(name, mode, bufsize)
def fileopen(file):
"""Public routine to get a posixfile object from a Python file object."""
return _posixfile_().fileopen(file)
#
# Constants
#
SEEK_SET = 0
SEEK_CUR = 1
SEEK_END = 2
#
# End of posixfile.py
#
This diff is collapsed.
"""Routine to "compile" a .py file to a .pyc (or .pyo) file.
This module has intimate knowledge of the format of .pyc files.
"""
import imp
MAGIC = imp.get_magic()
def wr_long(f, x):
"""Internal; write a 32-bit int to a file in little-endian order."""
f.write(chr( x & 0xff))
f.write(chr((x >> 8) & 0xff))
f.write(chr((x >> 16) & 0xff))
f.write(chr((x >> 24) & 0xff))
def compile(file, cfile=None, dfile=None):
"""Byte-compile one Python source file to Python bytecode.
Arguments:
file: source filename
cfile: target filename; defaults to source with 'c' or 'o' appended
('c' normally, 'o' in optimizing mode, giving .pyc or .pyo)
dfile: purported filename; defaults to source (this is the filename
that will show up in error messages)
Note that it isn't necessary to byte-compile Python modules for
execution efficiency -- Python itself byte-compiles a module when
it is loaded, and if it can, writes out the bytecode to the
corresponding .pyc (or .pyo) file.
However, if a Python installation is shared between users, it is a
good idea to byte-compile all modules upon installation, since
other users may not be able to write in the source directories,
and thus they won't be able to write the .pyc/.pyo file, and then
they would be byte-compiling every module each time it is loaded.
This can slow down program start-up considerably.
See compileall.py for a script/module that uses this module to
byte-compile all installed files (or all files in selected
directories).
"""
import os, marshal, __builtin__
f = open(file)
try:
timestamp = long(os.fstat(f.fileno())[8])
except AttributeError:
timestamp = long(os.stat(file)[8])
codestring = f.read()
# If parsing from a string, line breaks are \n (see parsetok.c:tok_nextc)
# Replace will return original string if pattern is not found, so
# we don't need to check whether it is found first.
codestring = codestring.replace("\r\n","\n")
codestring = codestring.replace("\r","\n")
f.close()
if codestring and codestring[-1] != '\n':
codestring = codestring + '\n'
try:
codeobject = __builtin__.compile(codestring, dfile or file, 'exec')
except SyntaxError, detail:
import traceback, sys, string
lines = traceback.format_exception_only(SyntaxError, detail)
for line in lines:
sys.stderr.write(string.replace(line, 'File "<string>"',
'File "%s"' % (dfile or file)))
return
if not cfile:
cfile = file + (__debug__ and 'c' or 'o')
fc = open(cfile, 'wb')
fc.write('\0\0\0\0')
wr_long(fc, timestamp)
marshal.dump(codeobject, fc)
fc.flush()
fc.seek(0, 0)
fc.write(MAGIC)
fc.close()
if os.name == 'mac':
import macfs
macfs.FSSpec(cfile).SetCreatorType('Pyth', 'PYC ')
"""A multi-producer, multi-consumer queue."""
# define this exception to be compatible with Python 1.5's class
# exceptions, but also when -X option is used.
try:
class Empty(Exception):
pass
class Full(Exception):
pass
except TypeError:
# string based exceptions
# exception raised by get(block=0)/get_nowait()
Empty = 'Queue.Empty'
# exception raised by put(block=0)/put_nowait()
Full = 'Queue.Full'
class Queue:
def __init__(self, maxsize=0):
"""Initialize a queue object with a given maximum size.
If maxsize is <= 0, the queue size is infinite.
"""
import thread
self._init(maxsize)
self.mutex = thread.allocate_lock()
self.esema = thread.allocate_lock()
self.esema.acquire()
self.fsema = thread.allocate_lock()
def qsize(self):
"""Return the approximate size of the queue (not reliable!)."""
self.mutex.acquire()
n = self._qsize()
self.mutex.release()
return n
def empty(self):
"""Return 1 if the queue is empty, 0 otherwise (not reliable!)."""
self.mutex.acquire()
n = self._empty()
self.mutex.release()
return n
def full(self):
"""Return 1 if the queue is full, 0 otherwise (not reliable!)."""
self.mutex.acquire()
n = self._full()
self.mutex.release()
return n
def put(self, item, block=1):
"""Put an item into the queue.
If optional arg 'block' is 1 (the default), block if
necessary until a free slot is available. Otherwise (block
is 0), put an item on the queue if a free slot is immediately
available, else raise the Full exception.
"""
if block:
self.fsema.acquire()
elif not self.fsema.acquire(0):
raise Full
self.mutex.acquire()
was_empty = self._empty()
self._put(item)
if was_empty:
self.esema.release()
if not self._full():
self.fsema.release()
self.mutex.release()
def put_nowait(self, item):
"""Put an item into the queue without blocking.
Only enqueue the item if a free slot is immediately available.
Otherwise raise the Full exception.
"""
return self.put(item, 0)
def get(self, block=1):
"""Remove and return an item from the queue.
If optional arg 'block' is 1 (the default), block if
necessary until an item is available. Otherwise (block is 0),
return an item if one is immediately available, else raise the
Empty exception.
"""
if block:
self.esema.acquire()
elif not self.esema.acquire(0):
raise Empty
self.mutex.acquire()
was_full = self._full()
item = self._get()
if was_full:
self.fsema.release()
if not self._empty():
self.esema.release()
self.mutex.release()
return item
def get_nowait(self):
"""Remove and return an item from the queue without blocking.
Only get an item if one is immediately available. Otherwise
raise the Empty exception.
"""
return self.get(0)
# Override these methods to implement other queue organizations
# (e.g. stack or priority queue).
# These will only be called with appropriate locks held
# Initialize the queue representation
def _init(self, maxsize):
self.maxsize = maxsize
self.queue = []
def _qsize(self):
return len(self.queue)
# Check whether the queue is empty
def _empty(self):
return not self.queue
# Check whether the queue is full
def _full(self):
return self.maxsize > 0 and len(self.queue) == self.maxsize
# Put a new item in the queue
def _put(self, item):
self.queue.append(item)
# Get an item from the queue
def _get(self):
item = self.queue[0]
del self.queue[0]
return item
This diff is collapsed.
"""Constants for selecting regexp syntaxes for the obsolete regex module.
This module is only for backward compatibility. "regex" has now
been replaced by the new regular expression module, "re".
These bits are passed to regex.set_syntax() to choose among
alternative regexp syntaxes.
"""
# 1 means plain parentheses serve as grouping, and backslash
# parentheses are needed for literal searching.
# 0 means backslash-parentheses are grouping, and plain parentheses
# are for literal searching.
RE_NO_BK_PARENS = 1
# 1 means plain | serves as the "or"-operator, and \| is a literal.
# 0 means \| serves as the "or"-operator, and | is a literal.
RE_NO_BK_VBAR = 2
# 0 means plain + or ? serves as an operator, and \+, \? are literals.
# 1 means \+, \? are operators and plain +, ? are literals.
RE_BK_PLUS_QM = 4
# 1 means | binds tighter than ^ or $.
# 0 means the contrary.
RE_TIGHT_VBAR = 8
# 1 means treat \n as an _OR operator
# 0 means treat it as a normal character
RE_NEWLINE_OR = 16
# 0 means that a special characters (such as *, ^, and $) always have
# their special meaning regardless of the surrounding context.
# 1 means that special characters may act as normal characters in some
# contexts. Specifically, this applies to:
# ^ - only special at the beginning, or after ( or |
# $ - only special at the end, or before ) or |
# *, +, ? - only special when not after the beginning, (, or |
RE_CONTEXT_INDEP_OPS = 32
# ANSI sequences (\n etc) and \xhh
RE_ANSI_HEX = 64
# No GNU extensions
RE_NO_GNU_EXTENSIONS = 128
# Now define combinations of bits for the standard possibilities.
RE_SYNTAX_AWK = (RE_NO_BK_PARENS | RE_NO_BK_VBAR | RE_CONTEXT_INDEP_OPS)
RE_SYNTAX_EGREP = (RE_SYNTAX_AWK | RE_NEWLINE_OR)
RE_SYNTAX_GREP = (RE_BK_PLUS_QM | RE_NEWLINE_OR)
RE_SYNTAX_EMACS = 0
# (Python's obsolete "regexp" module used a syntax similar to awk.)
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
# Test the exit module
from test_support import verbose
import atexit
def handler1():
print "handler1"
def handler2(*args, **kargs):
print "handler2", args, kargs
# save any exit functions that may have been registered as part of the
# test framework
_exithandlers = atexit._exithandlers
atexit._exithandlers = []
atexit.register(handler1)
atexit.register(handler2)
atexit.register(handler2, 7, kw="abc")
# simulate exit behavior by calling atexit._run_exitfuncs directly...
atexit._run_exitfuncs()
# restore exit handlers
atexit._exithandlers = _exithandlers
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
# Python test set -- part 4, built-in functions
from test_support import *
print '4. Built-in functions'
print 'test_b1'
unload('test_b1')
import test_b1
print 'test_b2'
unload('test_b2')
import test_b2
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
# Test the cPickle module
import cPickle
import test_pickle
test_pickle.dotest(cPickle)
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment