Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
C
cpython
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
cpython
Commits
67db4b14
Commit
67db4b14
authored
Oct 02, 1998
by
Guido van Rossum
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Some new blood and some updated versions.
parent
49db01ff
Changes
7
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
1105 additions
and
14 deletions
+1105
-14
Lib/dos-8x3/exceptio.py
Lib/dos-8x3/exceptio.py
+20
-12
Lib/dos-8x3/posixpat.py
Lib/dos-8x3/posixpat.py
+2
-0
Lib/dos-8x3/py_compi.py
Lib/dos-8x3/py_compi.py
+9
-1
Lib/dos-8x3/stringio.py
Lib/dos-8x3/stringio.py
+14
-1
Lib/dos-8x3/test_lon.py
Lib/dos-8x3/test_lon.py
+252
-0
Lib/dos-8x3/test_mim.py
Lib/dos-8x3/test_mim.py
+170
-0
Lib/dos-8x3/threadin.py
Lib/dos-8x3/threadin.py
+638
-0
No files found.
Lib/dos-8x3/exceptio.py
View file @
67db4b14
"""Class based built-in exception hierarchy.
This is a new feature whereby all the standard built-in exceptions,
traditionally string objects, are replaced with classes. This gives
Python's exception handling mechanism a more object-oriented feel.
New with Python 1.5, all standard built-in exceptions are now class objects by
default. This gives Python's exception handling mechanism a more
object-oriented feel. Traditionally they were string objects. Python will
fallback to string based exceptions if the interpreter is invoked with the -X
option, or if some failure occurs during class exception initialization (in
this case a warning will be printed).
Most existing code should continue to work with class based
exceptions. Some tricky uses of IOError may break, but the most
common uses should work.
Most existing code should continue to work with class based exceptions. Some
tricky uses of IOError may break, but the most common uses should work.
To disable this feature, start the Python executable with the -X option.
Here is a rundown of the class hierarchy. You can change this by editing this
file, but it isn't recommended. The class names described here are expected
to be found by the bltinmodule.c file.
Here is a rundown of the class hierarchy. You can change this by
editing this file, but it isn't recommended. The classes with a `*'
are new with this feature. They are defined as tuples containing the
derived exceptions when string-based exceptions are used
.
The classes with a `*' are new as of Python 1.5. They are defined as tuples
containing the derived exceptions when string-based exceptions are used. If
you define your own class based exceptions, they should be derived from
Exception
.
Exception(*)
|
...
...
@@ -22,7 +26,11 @@ Exception(*)
+-- SystemExit
+-- KeyboardInterrupt
+-- ImportError
+-- IOError
+-- EnvironmentError(*)
| |
| +-- IOError
| +-- OSError(*)
|
+-- EOFError
+-- RuntimeError
+-- NameError
...
...
Lib/dos-8x3/posixpat.py
View file @
67db4b14
...
...
@@ -354,6 +354,8 @@ def normpath(path):
while
i
<
len
(
comps
):
if
comps
[
i
]
==
'.'
:
del
comps
[
i
]
while
i
<
len
(
comps
)
and
comps
[
i
]
==
''
:
del
comps
[
i
]
elif
comps
[
i
]
==
'..'
and
i
>
0
and
comps
[
i
-
1
]
not
in
(
''
,
'..'
):
del
comps
[
i
-
1
:
i
+
1
]
i
=
i
-
1
...
...
Lib/dos-8x3/py_compi.py
View file @
67db4b14
...
...
@@ -51,7 +51,15 @@ def compile(file, cfile=None, dfile=None):
f
.
close
()
if
codestring
and
codestring
[
-
1
]
!=
'
\
n
'
:
codestring
=
codestring
+
'
\
n
'
codeobject
=
__builtin__
.
compile
(
codestring
,
dfile
or
file
,
'exec'
)
try
:
codeobject
=
__builtin__
.
compile
(
codestring
,
dfile
or
file
,
'exec'
)
except
SyntaxError
,
detail
:
import
traceback
,
sys
,
string
lines
=
traceback
.
format_exception_only
(
SyntaxError
,
detail
)
for
line
in
lines
:
sys
.
stderr
.
write
(
string
.
replace
(
line
,
'File "<string>"'
,
'File "%s"'
%
(
dfile
or
file
)))
return
if
not
cfile
:
cfile
=
file
+
(
__debug__
and
'c'
or
'o'
)
fc
=
open
(
cfile
,
'wb'
)
...
...
Lib/dos-8x3/stringio.py
View file @
67db4b14
...
...
@@ -41,8 +41,12 @@ class StringIO:
self
.
closed
=
1
del
self
.
buf
,
self
.
pos
def
isatty
(
self
):
if
self
.
closed
:
raise
ValueError
,
"I/O operation on closed file"
return
0
def
seek
(
self
,
pos
,
mode
=
0
):
if
self
.
closed
:
raise
ValueError
,
"I/O operation on closed file"
if
self
.
buflist
:
self
.
buf
=
self
.
buf
+
string
.
joinfields
(
self
.
buflist
,
''
)
self
.
buflist
=
[]
...
...
@@ -52,8 +56,12 @@ class StringIO:
pos
=
pos
+
self
.
len
self
.
pos
=
max
(
0
,
pos
)
def
tell
(
self
):
if
self
.
closed
:
raise
ValueError
,
"I/O operation on closed file"
return
self
.
pos
def
read
(
self
,
n
=
-
1
):
if
self
.
closed
:
raise
ValueError
,
"I/O operation on closed file"
if
self
.
buflist
:
self
.
buf
=
self
.
buf
+
string
.
joinfields
(
self
.
buflist
,
''
)
self
.
buflist
=
[]
...
...
@@ -65,6 +73,8 @@ class StringIO:
self
.
pos
=
newpos
return
r
def
readline
(
self
,
length
=
None
):
if
self
.
closed
:
raise
ValueError
,
"I/O operation on closed file"
if
self
.
buflist
:
self
.
buf
=
self
.
buf
+
string
.
joinfields
(
self
.
buflist
,
''
)
self
.
buflist
=
[]
...
...
@@ -87,6 +97,8 @@ class StringIO:
line
=
self
.
readline
()
return
lines
def
write
(
self
,
s
):
if
self
.
closed
:
raise
ValueError
,
"I/O operation on closed file"
if
not
s
:
return
if
self
.
pos
>
self
.
len
:
self
.
buflist
.
append
(
'
\
0
'
*
(
self
.
pos
-
self
.
len
))
...
...
@@ -105,7 +117,8 @@ class StringIO:
def
writelines
(
self
,
list
):
self
.
write
(
string
.
joinfields
(
list
,
''
))
def
flush
(
self
):
pass
if
self
.
closed
:
raise
ValueError
,
"I/O operation on closed file"
def
getvalue
(
self
):
if
self
.
buflist
:
self
.
buf
=
self
.
buf
+
string
.
joinfields
(
self
.
buflist
,
''
)
...
...
Lib/dos-8x3/test_lon.py
0 → 100644
View file @
67db4b14
from
test_support
import
TestFailed
,
verbose
from
string
import
join
from
random
import
random
,
randint
# SHIFT should match the value in longintrepr.h for best testing.
SHIFT
=
15
BASE
=
2
**
SHIFT
MASK
=
BASE
-
1
# Max number of base BASE digits to use in test cases. Doubling
# this will at least quadruple the runtime.
MAXDIGITS
=
10
# build some special values
special
=
map
(
long
,
[
0
,
1
,
2
,
BASE
,
BASE
>>
1
])
special
.
append
(
0x5555555555555555
L
)
special
.
append
(
0xaaaaaaaaaaaaaaaa
L
)
# some solid strings of one bits
p2
=
4L
# 0 and 1 already added
for
i
in
range
(
2
*
SHIFT
):
special
.
append
(
p2
-
1
)
p2
=
p2
<<
1
del
p2
# add complements & negations
special
=
special
+
map
(
lambda
x
:
~
x
,
special
)
+
\
map
(
lambda
x
:
-
x
,
special
)
# ------------------------------------------------------------ utilities
# Use check instead of assert so the test still does something
# under -O.
def
check
(
ok
,
*
args
):
if
not
ok
:
raise
TestFailed
,
join
(
map
(
str
,
args
),
" "
)
# Get quasi-random long consisting of ndigits digits (in base BASE).
# quasi == the most-significant digit will not be 0, and the number
# is constructed to contain long strings of 0 and 1 bits. These are
# more likely than random bits to provoke digit-boundary errors.
# The sign of the number is also random.
def
getran
(
ndigits
):
assert
ndigits
>
0
nbits_hi
=
ndigits
*
SHIFT
nbits_lo
=
nbits_hi
-
SHIFT
+
1
answer
=
0L
nbits
=
0
r
=
int
(
random
()
*
(
SHIFT
*
2
))
|
1
# force 1 bits to start
while
nbits
<
nbits_lo
:
bits
=
(
r
>>
1
)
+
1
bits
=
min
(
bits
,
nbits_hi
-
nbits
)
assert
1
<=
bits
<=
SHIFT
nbits
=
nbits
+
bits
answer
=
answer
<<
bits
if
r
&
1
:
answer
=
answer
|
((
1
<<
bits
)
-
1
)
r
=
int
(
random
()
*
(
SHIFT
*
2
))
assert
nbits_lo
<=
nbits
<=
nbits_hi
if
random
()
<
0.5
:
answer
=
-
answer
return
answer
# Get random long consisting of ndigits random digits (relative to base
# BASE). The sign bit is also random.
def
getran2
(
ndigits
):
answer
=
0L
for
i
in
range
(
ndigits
):
answer
=
(
answer
<<
SHIFT
)
|
randint
(
0
,
MASK
)
if
random
()
<
0.5
:
answer
=
-
answer
return
answer
# --------------------------------------------------------------- divmod
def
test_division_2
(
x
,
y
):
q
,
r
=
divmod
(
x
,
y
)
q2
,
r2
=
x
/
y
,
x
%
y
check
(
q
==
q2
,
"divmod returns different quotient than / for"
,
x
,
y
)
check
(
r
==
r2
,
"divmod returns different mod than % for"
,
x
,
y
)
check
(
x
==
q
*
y
+
r
,
"x != q*y + r after divmod on"
,
x
,
y
)
if
y
>
0
:
check
(
0
<=
r
<
y
,
"bad mod from divmod on"
,
x
,
y
)
else
:
check
(
y
<
r
<=
0
,
"bad mod from divmod on"
,
x
,
y
)
def
test_division
(
maxdigits
=
MAXDIGITS
):
print
"long / * % divmod"
digits
=
range
(
1
,
maxdigits
+
1
)
for
lenx
in
digits
:
x
=
getran
(
lenx
)
for
leny
in
digits
:
y
=
getran
(
leny
)
or
1L
test_division_2
(
x
,
y
)
# -------------------------------------------------------------- ~ & | ^
def
test_bitop_identities_1
(
x
):
check
(
x
&
0
==
0
,
"x & 0 != 0 for"
,
x
)
check
(
x
|
0
==
x
,
"x | 0 != x for"
,
x
)
check
(
x
^
0
==
x
,
"x ^ 0 != x for"
,
x
)
check
(
x
&
-
1
==
x
,
"x & -1 != x for"
,
x
)
check
(
x
|
-
1
==
-
1
,
"x | -1 != -1 for"
,
x
)
check
(
x
^
-
1
==
~
x
,
"x ^ -1 != ~x for"
,
x
)
check
(
x
==
~~
x
,
"x != ~~x for"
,
x
)
check
(
x
&
x
==
x
,
"x & x != x for"
,
x
)
check
(
x
|
x
==
x
,
"x | x != x for"
,
x
)
check
(
x
^
x
==
0
,
"x ^ x != 0 for"
,
x
)
check
(
x
&
~
x
==
0
,
"x & ~x != 0 for"
,
x
)
check
(
x
|
~
x
==
-
1
,
"x | ~x != -1 for"
,
x
)
check
(
x
^
~
x
==
-
1
,
"x ^ ~x != -1 for"
,
x
)
check
(
-
x
==
1
+
~
x
==
~
(
x
-
1
),
"not -x == 1 + ~x == ~(x-1) for"
,
x
)
for
n
in
range
(
2
*
SHIFT
):
p2
=
2L
**
n
check
(
x
<<
n
>>
n
==
x
,
"x << n >> n != x for"
,
x
,
n
)
check
(
x
/
p2
==
x
>>
n
,
"x / p2 != x >> n for x n p2"
,
x
,
n
,
p2
)
check
(
x
*
p2
==
x
<<
n
,
"x * p2 != x << n for x n p2"
,
x
,
n
,
p2
)
check
(
x
&
-
p2
==
x
>>
n
<<
n
==
x
&
~
(
p2
-
1
),
"not x & -p2 == x >> n << n == x & ~(p2 - 1) for x n p2"
,
x
,
n
,
p2
)
def
test_bitop_identities_2
(
x
,
y
):
check
(
x
&
y
==
y
&
x
,
"x & y != y & x for"
,
x
,
y
)
check
(
x
|
y
==
y
|
x
,
"x | y != y | x for"
,
x
,
y
)
check
(
x
^
y
==
y
^
x
,
"x ^ y != y ^ x for"
,
x
,
y
)
check
(
x
^
y
^
x
==
y
,
"x ^ y ^ x != y for"
,
x
,
y
)
check
(
x
&
y
==
~
(
~
x
|
~
y
),
"x & y != ~(~x | ~y) for"
,
x
,
y
)
check
(
x
|
y
==
~
(
~
x
&
~
y
),
"x | y != ~(~x & ~y) for"
,
x
,
y
)
check
(
x
^
y
==
(
x
|
y
)
&
~
(
x
&
y
),
"x ^ y != (x | y) & ~(x & y) for"
,
x
,
y
)
check
(
x
^
y
==
(
x
&
~
y
)
|
(
~
x
&
y
),
"x ^ y == (x & ~y) | (~x & y) for"
,
x
,
y
)
check
(
x
^
y
==
(
x
|
y
)
&
(
~
x
|
~
y
),
"x ^ y == (x | y) & (~x | ~y) for"
,
x
,
y
)
def
test_bitop_identities_3
(
x
,
y
,
z
):
check
((
x
&
y
)
&
z
==
x
&
(
y
&
z
),
"(x & y) & z != x & (y & z) for"
,
x
,
y
,
z
)
check
((
x
|
y
)
|
z
==
x
|
(
y
|
z
),
"(x | y) | z != x | (y | z) for"
,
x
,
y
,
z
)
check
((
x
^
y
)
^
z
==
x
^
(
y
^
z
),
"(x ^ y) ^ z != x ^ (y ^ z) for"
,
x
,
y
,
z
)
check
(
x
&
(
y
|
z
)
==
(
x
&
y
)
|
(
x
&
z
),
"x & (y | z) != (x & y) | (x & z) for"
,
x
,
y
,
z
)
check
(
x
|
(
y
&
z
)
==
(
x
|
y
)
&
(
x
|
z
),
"x | (y & z) != (x | y) & (x | z) for"
,
x
,
y
,
z
)
def
test_bitop_identities
(
maxdigits
=
MAXDIGITS
):
print
"long bit-operation identities"
for
x
in
special
:
test_bitop_identities_1
(
x
)
digits
=
range
(
1
,
maxdigits
+
1
)
for
lenx
in
digits
:
x
=
getran
(
lenx
)
test_bitop_identities_1
(
x
)
for
leny
in
digits
:
y
=
getran
(
leny
)
test_bitop_identities_2
(
x
,
y
)
test_bitop_identities_3
(
x
,
y
,
getran
((
lenx
+
leny
)
/
2
))
# ------------------------------------------------------ hex oct str atol
def
slow_format
(
x
,
base
):
if
(
x
,
base
)
==
(
0
,
8
):
# this is an oddball!
return
"0L"
digits
=
[]
sign
=
0
if
x
<
0
:
sign
,
x
=
1
,
-
x
while
x
:
x
,
r
=
divmod
(
x
,
base
)
digits
.
append
(
int
(
r
))
digits
.
reverse
()
digits
=
digits
or
[
0
]
return
'-'
[:
sign
]
+
\
{
8
:
'0'
,
10
:
''
,
16
:
'0x'
}[
base
]
+
\
join
(
map
(
lambda
i
:
"0123456789ABCDEF"
[
i
],
digits
),
''
)
+
\
"L"
def
test_format_1
(
x
):
from
string
import
atol
for
base
,
mapper
in
(
8
,
oct
),
(
10
,
str
),
(
16
,
hex
):
got
=
mapper
(
x
)
expected
=
slow_format
(
x
,
base
)
check
(
got
==
expected
,
mapper
.
__name__
,
"returned"
,
got
,
"but expected"
,
expected
,
"for"
,
x
)
check
(
atol
(
got
,
0
)
==
x
,
'atol("%s", 0) !='
%
got
,
x
)
def
test_format
(
maxdigits
=
MAXDIGITS
):
print
"long str/hex/oct/atol"
for
x
in
special
:
test_format_1
(
x
)
for
i
in
range
(
10
):
for
lenx
in
range
(
1
,
maxdigits
+
1
):
x
=
getran
(
lenx
)
test_format_1
(
x
)
# ----------------------------------------------------------------- misc
def
test_misc
(
maxdigits
=
MAXDIGITS
):
print
"long miscellaneous operations"
import
sys
# check the extremes in int<->long conversion
hugepos
=
sys
.
maxint
hugeneg
=
-
hugepos
-
1
hugepos_aslong
=
long
(
hugepos
)
hugeneg_aslong
=
long
(
hugeneg
)
check
(
hugepos
==
hugepos_aslong
,
"long(sys.maxint) != sys.maxint"
)
check
(
hugeneg
==
hugeneg_aslong
,
"long(-sys.maxint-1) != -sys.maxint-1"
)
# long -> int should not fail for hugepos_aslong or hugeneg_aslong
try
:
check
(
int
(
hugepos_aslong
)
==
hugepos
,
"converting sys.maxint to long and back to int fails"
)
except
OverflowError
:
raise
TestFailed
,
"int(long(sys.maxint)) overflowed!"
try
:
check
(
int
(
hugeneg_aslong
)
==
hugeneg
,
"converting -sys.maxint-1 to long and back to int fails"
)
except
OverflowError
:
raise
TestFailed
,
"int(long(-sys.maxint-1)) overflowed!"
# but long -> int should overflow for hugepos+1 and hugeneg-1
x
=
hugepos_aslong
+
1
try
:
int
(
x
)
raise
ValueError
except
OverflowError
:
pass
except
:
raise
TestFailed
,
"int(long(sys.maxint) + 1) didn't overflow"
x
=
hugeneg_aslong
-
1
try
:
int
(
x
)
raise
ValueError
except
OverflowError
:
pass
except
:
raise
TestFailed
,
"int(long(-sys.maxint-1) - 1) didn't overflow"
# ---------------------------------------------------------------- do it
test_division
()
test_bitop_identities
()
test_format
()
test_misc
()
Lib/dos-8x3/test_mim.py
0 → 100644
View file @
67db4b14
"""Test program for MimeWriter module.
The test program was too big to comfortably fit in the MimeWriter
class, so it's here in its own file.
This should generate Barry's example, modulo some quotes and newlines.
"""
from
MimeWriter
import
MimeWriter
SELLER
=
'''
\
INTERFACE Seller-1;
TYPE Seller = OBJECT
DOCUMENTATION "A simple Seller interface to test ILU"
METHODS
price():INTEGER,
END;
'''
BUYER
=
'''
\
class Buyer:
def __setup__(self, maxprice):
self._maxprice = maxprice
def __main__(self, kos):
"""Entry point upon arrival at a new KOS."""
broker = kos.broker()
# B4 == Barry's Big Bass Business :-)
seller = broker.lookup('Seller_1.Seller', 'B4')
if seller:
price = seller.price()
print 'Seller wants $', price, '... '
if price > self._maxprice:
print 'too much!'
else:
print "I'll take it!"
else:
print 'no seller found here'
'''
# Don't ask why this comment is here
STATE
=
'''
\
# instantiate a buyer instance and put it in a magic place for the KOS
# to find.
__kp__ = Buyer()
__kp__.__setup__(500)
'''
SIMPLE_METADATA
=
[
(
"Interpreter"
,
"python"
),
(
"Interpreter-Version"
,
"1.3"
),
(
"Owner-Name"
,
"Barry Warsaw"
),
(
"Owner-Rendezvous"
,
"bwarsaw@cnri.reston.va.us"
),
(
"Home-KSS"
,
"kss.cnri.reston.va.us"
),
(
"Identifier"
,
"hdl://cnri.kss/my_first_knowbot"
),
(
"Launch-Date"
,
"Mon Feb 12 16:39:03 EST 1996"
),
]
COMPLEX_METADATA
=
[
(
"Metadata-Type"
,
"complex"
),
(
"Metadata-Key"
,
"connection"
),
(
"Access"
,
"read-only"
),
(
"Connection-Description"
,
"Barry's Big Bass Business"
),
(
"Connection-Id"
,
"B4"
),
(
"Connection-Direction"
,
"client"
),
]
EXTERNAL_METADATA
=
[
(
"Metadata-Type"
,
"complex"
),
(
"Metadata-Key"
,
"generic-interface"
),
(
"Access"
,
"read-only"
),
(
"Connection-Description"
,
"Generic Interface for All Knowbots"
),
(
"Connection-Id"
,
"generic-kp"
),
(
"Connection-Direction"
,
"client"
),
]
def
main
():
import
sys
# Toplevel headers
toplevel
=
MimeWriter
(
sys
.
stdout
)
toplevel
.
addheader
(
"From"
,
"bwarsaw@cnri.reston.va.us"
)
toplevel
.
addheader
(
"Date"
,
"Mon Feb 12 17:21:48 EST 1996"
)
toplevel
.
addheader
(
"To"
,
"kss-submit@cnri.reston.va.us"
)
toplevel
.
addheader
(
"MIME-Version"
,
"1.0"
)
# Toplevel body parts
f
=
toplevel
.
startmultipartbody
(
"knowbot"
,
"801spam999"
,
[(
"version"
,
"0.1"
)],
prefix
=
0
)
f
.
write
(
"This is a multi-part message in MIME format.
\
n
"
)
# First toplevel body part: metadata
md
=
toplevel
.
nextpart
()
md
.
startmultipartbody
(
"knowbot-metadata"
,
"802spam999"
)
# Metadata part 1
md1
=
md
.
nextpart
()
md1
.
addheader
(
"KP-Metadata-Type"
,
"simple"
)
md1
.
addheader
(
"KP-Access"
,
"read-only"
)
m
=
MimeWriter
(
md1
.
startbody
(
"message/rfc822"
))
for
key
,
value
in
SIMPLE_METADATA
:
m
.
addheader
(
"KPMD-"
+
key
,
value
)
m
.
flushheaders
()
del
md1
# Metadata part 2
md2
=
md
.
nextpart
()
for
key
,
value
in
COMPLEX_METADATA
:
md2
.
addheader
(
"KP-"
+
key
,
value
)
f
=
md2
.
startbody
(
"text/isl"
)
f
.
write
(
SELLER
)
del
md2
# Metadata part 3
md3
=
md
.
nextpart
()
f
=
md3
.
startbody
(
"message/external-body"
,
[(
"access-type"
,
"URL"
),
(
"URL"
,
"hdl://cnri.kss/generic-knowbot"
)])
m
=
MimeWriter
(
f
)
for
key
,
value
in
EXTERNAL_METADATA
:
md3
.
addheader
(
"KP-"
+
key
,
value
)
md3
.
startbody
(
"text/isl"
)
# Phantom body doesn't need to be written
md
.
lastpart
()
# Second toplevel body part: code
code
=
toplevel
.
nextpart
()
code
.
startmultipartbody
(
"knowbot-code"
,
"803spam999"
)
# Code: buyer program source
buyer
=
code
.
nextpart
()
buyer
.
addheader
(
"KP-Module-Name"
,
"BuyerKP"
)
f
=
buyer
.
startbody
(
"text/plain"
)
f
.
write
(
BUYER
)
code
.
lastpart
()
# Third toplevel body part: state
state
=
toplevel
.
nextpart
()
state
.
addheader
(
"KP-Main-Module"
,
"main"
)
state
.
startmultipartbody
(
"knowbot-state"
,
"804spam999"
)
# State: a bunch of assignments
st
=
state
.
nextpart
()
st
.
addheader
(
"KP-Module-Name"
,
"main"
)
f
=
st
.
startbody
(
"text/plain"
)
f
.
write
(
STATE
)
state
.
lastpart
()
# End toplevel body parts
toplevel
.
lastpart
()
main
()
Lib/dos-8x3/threadin.py
0 → 100644
View file @
67db4b14
# threading.py:
# Proposed new threading module, emulating a subset of Java's threading model
import
sys
import
time
import
thread
import
traceback
import
StringIO
# Rename some stuff so "from threading import *" is safe
_sys
=
sys
del
sys
_time
=
time
.
time
_sleep
=
time
.
sleep
del
time
_start_new_thread
=
thread
.
start_new_thread
_allocate_lock
=
thread
.
allocate_lock
_get_ident
=
thread
.
get_ident
del
thread
_print_exc
=
traceback
.
print_exc
del
traceback
_StringIO
=
StringIO
.
StringIO
del
StringIO
# Debug support (adapted from ihooks.py)
_VERBOSE
=
0
if
__debug__
:
class
_Verbose
:
def
__init__
(
self
,
verbose
=
None
):
if
verbose
is
None
:
verbose
=
_VERBOSE
self
.
__verbose
=
verbose
def
_note
(
self
,
format
,
*
args
):
if
self
.
__verbose
:
format
=
format
%
args
format
=
"%s: %s
\
n
"
%
(
currentThread
().
getName
(),
format
)
_sys
.
stderr
.
write
(
format
)
else
:
# Disable this when using "python -O"
class
_Verbose
:
def
__init__
(
self
,
verbose
=
None
):
pass
def
_note
(
self
,
*
args
):
pass
# Synchronization classes
Lock
=
_allocate_lock
def
RLock
(
*
args
,
**
kwargs
):
return
apply
(
_RLock
,
args
,
kwargs
)
class
_RLock
(
_Verbose
):
def
__init__
(
self
,
verbose
=
None
):
_Verbose
.
__init__
(
self
,
verbose
)
self
.
__block
=
_allocate_lock
()
self
.
__owner
=
None
self
.
__count
=
0
def
__repr__
(
self
):
return
"<%s(%s, %d)>"
%
(
self
.
__class__
.
__name__
,
self
.
__owner
and
self
.
__owner
.
getName
(),
self
.
__count
)
def
acquire
(
self
,
blocking
=
1
):
me
=
currentThread
()
if
self
.
__owner
is
me
:
self
.
__count
=
self
.
__count
+
1
if
__debug__
:
self
.
_note
(
"%s.acquire(%s): recursive success"
,
self
,
blocking
)
return
1
rc
=
self
.
__block
.
acquire
(
blocking
)
if
rc
:
self
.
__owner
=
me
self
.
__count
=
1
if
__debug__
:
self
.
_note
(
"%s.acquire(%s): initial succes"
,
self
,
blocking
)
else
:
if
__debug__
:
self
.
_note
(
"%s.acquire(%s): failure"
,
self
,
blocking
)
return
rc
def
release
(
self
):
me
=
currentThread
()
assert
self
.
__owner
is
me
,
"release() of un-acquire()d lock"
self
.
__count
=
count
=
self
.
__count
-
1
if
not
count
:
self
.
__owner
=
None
self
.
__block
.
release
()
if
__debug__
:
self
.
_note
(
"%s.release(): final release"
,
self
)
else
:
if
__debug__
:
self
.
_note
(
"%s.release(): non-final release"
,
self
)
# Internal methods used by condition variables
def
_acquire_restore
(
self
,
(
count
,
owner
)):
self
.
__block
.
acquire
()
self
.
__count
=
count
self
.
__owner
=
owner
if
__debug__
:
self
.
_note
(
"%s._acquire_restore()"
,
self
)
def
_release_save
(
self
):
if
__debug__
:
self
.
_note
(
"%s._release_save()"
,
self
)
count
=
self
.
__count
self
.
__count
=
0
owner
=
self
.
__owner
self
.
__owner
=
None
self
.
__block
.
release
()
return
(
count
,
owner
)
def
_is_owned
(
self
):
return
self
.
__owner
is
currentThread
()
def
Condition
(
*
args
,
**
kwargs
):
return
apply
(
_Condition
,
args
,
kwargs
)
class
_Condition
(
_Verbose
):
def
__init__
(
self
,
lock
=
None
,
verbose
=
None
):
_Verbose
.
__init__
(
self
,
verbose
)
if
lock
is
None
:
lock
=
RLock
()
self
.
__lock
=
lock
# Export the lock's acquire() and release() methods
self
.
acquire
=
lock
.
acquire
self
.
release
=
lock
.
release
# If the lock defines _release_save() and/or _acquire_restore(),
# these override the default implementations (which just call
# release() and acquire() on the lock). Ditto for _is_owned().
try
:
self
.
_release_save
=
lock
.
_release_save
except
AttributeError
:
pass
try
:
self
.
_acquire_restore
=
lock
.
_acquire_restore
except
AttributeError
:
pass
try
:
self
.
_is_owned
=
lock
.
_is_owned
except
AttributeError
:
pass
self
.
__waiters
=
[]
def
__repr__
(
self
):
return
"<Condition(%s, %d)>"
%
(
self
.
__lock
,
len
(
self
.
__waiters
))
def
_release_save
(
self
):
self
.
__lock
.
release
()
# No state to save
def
_acquire_restore
(
self
,
x
):
self
.
__lock
.
acquire
()
# Ignore saved state
def
_is_owned
(
self
):
if
self
.
__lock
.
acquire
(
0
):
self
.
__lock
.
release
()
return
0
else
:
return
1
def
wait
(
self
,
timeout
=
None
):
me
=
currentThread
()
assert
self
.
_is_owned
(),
"wait() of un-acquire()d lock"
waiter
=
_allocate_lock
()
waiter
.
acquire
()
self
.
__waiters
.
append
(
waiter
)
saved_state
=
self
.
_release_save
()
if
timeout
is
None
:
waiter
.
acquire
()
if
__debug__
:
self
.
_note
(
"%s.wait(): got it"
,
self
)
else
:
endtime
=
_time
()
+
timeout
delay
=
0.000001
# 1 usec
while
1
:
gotit
=
waiter
.
acquire
(
0
)
if
gotit
or
_time
()
>=
endtime
:
break
_sleep
(
delay
)
if
delay
<
1.0
:
delay
=
delay
*
2.0
if
not
gotit
:
if
__debug__
:
self
.
_note
(
"%s.wait(%s): timed out"
,
self
,
timeout
)
try
:
self
.
__waiters
.
remove
(
waiter
)
except
ValueError
:
pass
else
:
if
__debug__
:
self
.
_note
(
"%s.wait(%s): got it"
,
self
,
timeout
)
self
.
_acquire_restore
(
saved_state
)
def
notify
(
self
,
n
=
1
):
me
=
currentThread
()
assert
self
.
_is_owned
(),
"notify() of un-acquire()d lock"
__waiters
=
self
.
__waiters
waiters
=
__waiters
[:
n
]
if
not
waiters
:
if
__debug__
:
self
.
_note
(
"%s.notify(): no waiters"
,
self
)
return
self
.
_note
(
"%s.notify(): notifying %d waiter%s"
,
self
,
n
,
n
!=
1
and
"s"
or
""
)
for
waiter
in
waiters
:
waiter
.
release
()
try
:
__waiters
.
remove
(
waiter
)
except
ValueError
:
pass
def
notifyAll
(
self
):
self
.
notify
(
len
(
self
.
__waiters
))
def
Semaphore
(
*
args
,
**
kwargs
):
return
apply
(
_Semaphore
,
args
,
kwargs
)
class
_Semaphore
(
_Verbose
):
# After Tim Peters' semaphore class, but bnot quite the same (no maximum)
def
__init__
(
self
,
value
=
1
,
verbose
=
None
):
assert
value
>=
0
,
"Semaphore initial value must be >= 0"
_Verbose
.
__init__
(
self
,
verbose
)
self
.
__cond
=
Condition
(
Lock
())
self
.
__value
=
value
def
acquire
(
self
,
blocking
=
1
):
rc
=
0
self
.
__cond
.
acquire
()
while
self
.
__value
==
0
:
if
not
blocking
:
break
self
.
__cond
.
wait
()
else
:
self
.
__value
=
self
.
__value
-
1
rc
=
1
self
.
__cond
.
release
()
return
rc
def
release
(
self
):
self
.
__cond
.
acquire
()
self
.
__value
=
self
.
__value
+
1
self
.
__cond
.
notify
()
self
.
__cond
.
release
()
def
Event
(
*
args
,
**
kwargs
):
return
apply
(
_Event
,
args
,
kwargs
)
class
_Event
(
_Verbose
):
# After Tim Peters' event class (without is_posted())
def
__init__
(
self
,
verbose
=
None
):
_Verbose
.
__init__
(
self
,
verbose
)
self
.
__cond
=
Condition
(
Lock
())
self
.
__flag
=
0
def
isSet
(
self
):
return
self
.
__flag
def
set
(
self
):
self
.
__cond
.
acquire
()
self
.
__flag
=
1
self
.
__cond
.
notifyAll
()
self
.
__cond
.
release
()
def
clear
(
self
):
self
.
__cond
.
acquire
()
self
.
__flag
=
0
self
.
__cond
.
release
()
def
wait
(
self
,
timeout
=
None
):
self
.
__cond
.
acquire
()
if
not
self
.
__flag
:
self
.
__cond
.
wait
(
timeout
)
self
.
__cond
.
release
()
# Helper to generate new thread names
_counter
=
0
def
_newname
(
template
=
"Thread-%d"
):
global
_counter
_counter
=
_counter
+
1
return
template
%
_counter
# Active thread administration
_active_limbo_lock
=
_allocate_lock
()
_active
=
{}
_limbo
=
{}
# Main class for threads
class
Thread
(
_Verbose
):
__initialized
=
0
def
__init__
(
self
,
group
=
None
,
target
=
None
,
name
=
None
,
args
=
(),
kwargs
=
{},
verbose
=
None
):
assert
group
is
None
,
"group argument must be None for now"
_Verbose
.
__init__
(
self
,
verbose
)
self
.
__target
=
target
self
.
__name
=
str
(
name
or
_newname
())
self
.
__args
=
args
self
.
__kwargs
=
kwargs
self
.
__daemonic
=
self
.
_set_daemon
()
self
.
__started
=
0
self
.
__stopped
=
0
self
.
__block
=
Condition
(
Lock
())
self
.
__initialized
=
1
def
_set_daemon
(
self
):
# Overridden in _MainThread and _DummyThread
return
currentThread
().
isDaemon
()
def
__repr__
(
self
):
assert
self
.
__initialized
,
"Thread.__init__() was not called"
status
=
"initial"
if
self
.
__started
:
status
=
"started"
if
self
.
__stopped
:
status
=
"stopped"
if
self
.
__daemonic
:
status
=
status
+
" daemon"
return
"<%s(%s, %s)>"
%
(
self
.
__class__
.
__name__
,
self
.
__name
,
status
)
def
start
(
self
):
assert
self
.
__initialized
,
"Thread.__init__() not called"
assert
not
self
.
__started
,
"thread already started"
if
__debug__
:
self
.
_note
(
"%s.start(): starting thread"
,
self
)
_active_limbo_lock
.
acquire
()
_limbo
[
self
]
=
self
_active_limbo_lock
.
release
()
_start_new_thread
(
self
.
__bootstrap
,
())
self
.
__started
=
1
_sleep
(
0.000001
)
# 1 usec, to let the thread run (Solaris hack)
def
run
(
self
):
if
self
.
__target
:
apply
(
self
.
__target
,
self
.
__args
,
self
.
__kwargs
)
def
__bootstrap
(
self
):
try
:
self
.
__started
=
1
_active_limbo_lock
.
acquire
()
_active
[
_get_ident
()]
=
self
del
_limbo
[
self
]
_active_limbo_lock
.
release
()
if
__debug__
:
self
.
_note
(
"%s.__bootstrap(): thread started"
,
self
)
try
:
self
.
run
()
except
SystemExit
:
if
__debug__
:
self
.
_note
(
"%s.__bootstrap(): raised SystemExit"
,
self
)
except
:
if
__debug__
:
self
.
_note
(
"%s.__bootstrap(): unhandled exception"
,
self
)
s
=
_StringIO
()
_print_exc
(
file
=
s
)
_sys
.
stderr
.
write
(
"Exception in thread %s:
\
n
%s
\
n
"
%
(
self
.
getName
(),
s
.
getvalue
()))
else
:
if
__debug__
:
self
.
_note
(
"%s.__bootstrap(): normal return"
,
self
)
finally
:
self
.
__stop
()
self
.
__delete
()
def
__stop
(
self
):
self
.
__block
.
acquire
()
self
.
__stopped
=
1
self
.
__block
.
notifyAll
()
self
.
__block
.
release
()
def
__delete
(
self
):
_active_limbo_lock
.
acquire
()
del
_active
[
_get_ident
()]
_active_limbo_lock
.
release
()
def
join
(
self
,
timeout
=
None
):
assert
self
.
__initialized
,
"Thread.__init__() not called"
assert
self
.
__started
,
"cannot join thread before it is started"
assert
self
is
not
currentThread
(),
"cannot join current thread"
if
__debug__
:
if
not
self
.
__stopped
:
self
.
_note
(
"%s.join(): waiting until thread stops"
,
self
)
self
.
__block
.
acquire
()
if
timeout
is
None
:
while
not
self
.
__stopped
:
self
.
__block
.
wait
()
if
__debug__
:
self
.
_note
(
"%s.join(): thread stopped"
,
self
)
else
:
deadline
=
_time
()
+
timeout
while
not
self
.
__stopped
:
delay
=
deadline
-
_time
()
if
delay
<=
0
:
if
__debug__
:
self
.
_note
(
"%s.join(): timed out"
,
self
)
break
self
.
__block
.
wait
(
delay
)
else
:
if
__debug__
:
self
.
_note
(
"%s.join(): thread stopped"
,
self
)
self
.
__block
.
release
()
def
getName
(
self
):
assert
self
.
__initialized
,
"Thread.__init__() not called"
return
self
.
__name
def
setName
(
self
,
name
):
assert
self
.
__initialized
,
"Thread.__init__() not called"
self
.
__name
=
str
(
name
)
def
isAlive
(
self
):
assert
self
.
__initialized
,
"Thread.__init__() not called"
return
self
.
__started
and
not
self
.
__stopped
def
isDaemon
(
self
):
assert
self
.
__initialized
,
"Thread.__init__() not called"
return
self
.
__daemonic
def
setDaemon
(
self
,
daemonic
):
assert
self
.
__initialized
,
"Thread.__init__() not called"
assert
not
self
.
__started
,
"cannot set daemon status of active thread"
self
.
__daemonic
=
daemonic
# Special thread class to represent the main thread
# This is garbage collected through an exit handler
class
_MainThread
(
Thread
):
def
__init__
(
self
):
Thread
.
__init__
(
self
,
name
=
"MainThread"
)
self
.
_Thread__started
=
1
_active_limbo_lock
.
acquire
()
_active
[
_get_ident
()]
=
self
_active_limbo_lock
.
release
()
try
:
self
.
__oldexitfunc
=
_sys
.
exitfunc
except
AttributeError
:
self
.
__oldexitfunc
=
None
_sys
.
exitfunc
=
self
.
__exitfunc
def
_set_daemon
(
self
):
return
0
def
__exitfunc
(
self
):
self
.
_Thread__stop
()
t
=
_pickSomeNonDaemonThread
()
if
t
:
if
__debug__
:
self
.
_note
(
"%s: waiting for other threads"
,
self
)
while
t
:
t
.
join
()
t
=
_pickSomeNonDaemonThread
()
if
self
.
__oldexitfunc
:
if
__debug__
:
self
.
_note
(
"%s: calling exit handler"
,
self
)
self
.
__oldexitfunc
()
if
__debug__
:
self
.
_note
(
"%s: exiting"
,
self
)
self
.
_Thread__delete
()
def
_pickSomeNonDaemonThread
():
for
t
in
enumerate
():
if
not
t
.
isDaemon
()
and
t
.
isAlive
():
return
t
return
None
# Dummy thread class to represent threads not started here.
# These aren't garbage collected when they die,
# nor can they be waited for.
# Their purpose is to return *something* from currentThread().
# They are marked as daemon threads so we won't wait for them
# when we exit (conform previous semantics).
class
_DummyThread
(
Thread
):
def
__init__
(
self
):
Thread
.
__init__
(
self
,
name
=
_newname
(
"Dummy-%d"
))
self
.
__Thread_started
=
1
_active_limbo_lock
.
acquire
()
_active
[
_get_ident
()]
=
self
_active_limbo_lock
.
release
()
def
_set_daemon
(
self
):
return
1
def
join
(
self
):
assert
0
,
"cannot join a dummy thread"
# Global API functions
def
currentThread
():
try
:
return
_active
[
_get_ident
()]
except
KeyError
:
print
"currentThread(): no current thread for"
,
_get_ident
()
return
_DummyThread
()
def
activeCount
():
_active_limbo_lock
.
acquire
()
count
=
len
(
_active
)
+
len
(
_limbo
)
_active_limbo_lock
.
release
()
return
count
def
enumerate
():
_active_limbo_lock
.
acquire
()
active
=
_active
.
values
()
+
_limbo
.
values
()
_active_limbo_lock
.
release
()
return
active
# Create the main thread object
_MainThread
()
# Self-test code
def
_test
():
import
random
class
BoundedQueue
(
_Verbose
):
def
__init__
(
self
,
limit
):
_Verbose
.
__init__
(
self
)
self
.
mon
=
RLock
()
self
.
rc
=
Condition
(
self
.
mon
)
self
.
wc
=
Condition
(
self
.
mon
)
self
.
limit
=
limit
self
.
queue
=
[]
def
put
(
self
,
item
):
self
.
mon
.
acquire
()
while
len
(
self
.
queue
)
>=
self
.
limit
:
self
.
_note
(
"put(%s): queue full"
,
item
)
self
.
wc
.
wait
()
self
.
queue
.
append
(
item
)
self
.
_note
(
"put(%s): appended, length now %d"
,
item
,
len
(
self
.
queue
))
self
.
rc
.
notify
()
self
.
mon
.
release
()
def
get
(
self
):
self
.
mon
.
acquire
()
while
not
self
.
queue
:
self
.
_note
(
"get(): queue empty"
)
self
.
rc
.
wait
()
item
=
self
.
queue
[
0
]
del
self
.
queue
[
0
]
self
.
_note
(
"get(): got %s, %d left"
,
item
,
len
(
self
.
queue
))
self
.
wc
.
notify
()
self
.
mon
.
release
()
return
item
class
ProducerThread
(
Thread
):
def
__init__
(
self
,
queue
,
quota
):
Thread
.
__init__
(
self
,
name
=
"Producer"
)
self
.
queue
=
queue
self
.
quota
=
quota
def
run
(
self
):
from
random
import
random
counter
=
0
while
counter
<
self
.
quota
:
counter
=
counter
+
1
self
.
queue
.
put
(
"%s.%d"
%
(
self
.
getName
(),
counter
))
_sleep
(
random
()
*
0.00001
)
class
ConsumerThread
(
Thread
):
def
__init__
(
self
,
queue
,
count
):
Thread
.
__init__
(
self
,
name
=
"Consumer"
)
self
.
queue
=
queue
self
.
count
=
count
def
run
(
self
):
while
self
.
count
>
0
:
item
=
self
.
queue
.
get
()
print
item
self
.
count
=
self
.
count
-
1
import
time
NP
=
3
QL
=
4
NI
=
5
Q
=
BoundedQueue
(
QL
)
P
=
[]
for
i
in
range
(
NP
):
t
=
ProducerThread
(
Q
,
NI
)
t
.
setName
(
"Producer-%d"
%
(
i
+
1
))
P
.
append
(
t
)
C
=
ConsumerThread
(
Q
,
NI
*
NP
)
for
t
in
P
:
t
.
start
()
_sleep
(
0.000001
)
C
.
start
()
for
t
in
P
:
t
.
join
()
C
.
join
()
if
__name__
==
'__main__'
:
_test
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment