gpython_test.py 10.2 KB
Newer Older
1
# -*- coding: utf-8 -*-
2 3
# Copyright (C) 2019-2020  Nexedi SA and Contributors.
#                          Kirill Smelkov <kirr@nexedi.com>
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
#
# This program is free software: you can Use, Study, Modify and Redistribute
# it under the terms of the GNU General Public License version 3, or (at your
# option) any later version, as published by the Free Software Foundation.
#
# You can also Link and Combine this program with other software covered by
# the terms of any of the Free Software licenses or any of the Open Source
# Initiative approved licenses and Convey the resulting work. Corresponding
# source of such a combination shall include the source code for all other
# software used.
#
# This program is distributed WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING file for full licensing terms.
# See https://www.nexedi.com/licensing for rationale and options.

21 22
from __future__ import print_function, absolute_import

Kirill Smelkov's avatar
Kirill Smelkov committed
23
import sys, os, platform, re, golang
24 25
from golang.golang_test import pyout, _pyrun
from subprocess import PIPE
26
from six import PY2
27 28 29
from six.moves import builtins
import pytest

30 31 32 33 34
from os.path import join, dirname, realpath
here     = dirname(__file__)
testdata = join(here, 'testdata')
testprog = join(here, 'testprog')

35 36 37
is_pypy    = (platform.python_implementation() == 'PyPy')
is_cpython = (platform.python_implementation() == 'CPython')

38 39 40
# @gpython_only is marker to run a test only under gpython
gpython_only = pytest.mark.skipif('GPython' not in sys.version, reason="gpython-only test")

41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
# runtime is pytest fixture that yields all variants of should be supported gpython runtimes:
# '' - not specified (gpython should autoselect)
# 'gevent'
# 'threads'
@pytest.fixture(scope="function", params=['', 'gevent', 'threads'])
def runtime(request):
    yield request.param

# gpyenv returns environment appropriate for spawning gpython with
# specified runtime.
def gpyenv(runtime): # -> env
    env = os.environ.copy()
    if runtime != '':
        env['GPYTHON_RUNTIME'] = runtime
    else:
        env.pop('GPYTHON_RUNTIME', None)
    return env

59 60 61 62 63 64 65 66 67 68 69

@gpython_only
def test_defaultencoding_utf8():
    assert sys.getdefaultencoding() == 'utf-8'

@gpython_only
def test_golang_builtins():
    # some direct accesses
    assert go     is golang.go
    assert chan   is golang.chan
    assert select is golang.select
70
    assert error  is golang.error
71 72
    assert b      is golang.b
    assert u      is golang.u
73 74 75 76 77 78 79

    # indirectly verify golang.__all__
    for k in golang.__all__:
        assert getattr(builtins, k) is getattr(golang, k)

@gpython_only
def test_gevent_activated():
80 81 82 83 84 85
    # gpython, by default, acticates gevent.
    # handling of various runtime modes is explicitly tested in test_Xruntime.
    assert_gevent_activated()

def assert_gevent_activated():
    assert 'gevent' in sys.modules
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
    from gevent.monkey import is_module_patched as patched, is_object_patched as obj_patched

    # builtin (gevent: only on py2 - on py3 __import__ uses fine-grained locking)
    if PY2:
        assert obj_patched('__builtin__', '__import__')

    assert patched('socket')
    # patch_socket(dns=True) also patches vvv
    assert obj_patched('socket', 'getaddrinfo')
    assert obj_patched('socket', 'gethostbyname')
    # ...

    assert patched('time')

    assert patched('select')
    import select as select_mod # patch_select(aggressive=True) removes vvv
    assert not hasattr(select_mod, 'epoll')
    assert not hasattr(select_mod, 'kqueue')
    assert not hasattr(select_mod, 'kevent')
    assert not hasattr(select_mod, 'devpoll')

    # XXX on native windows, patch_{os,signal} do nothing currently
    if os.name != 'nt':
        assert patched('os')
        assert patched('signal')

    assert patched('thread' if PY2 else '_thread')
    assert patched('threading')
    assert patched('_threading_local')

    assert patched('ssl')
    assert patched('subprocess')
    #assert patched('sys')       # currently disabled

    if sys.hexversion >= 0x03070000: # >= 3.7.0
        assert patched('queue')

123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
def assert_gevent_not_activated():
    assert 'gevent' not in sys.modules
    from gevent.monkey import is_module_patched as patched, is_object_patched as obj_patched

    assert not patched('socket')
    assert not patched('time')
    assert not patched('select')
    assert not patched('os')
    assert not patched('signal')
    assert not patched('thread' if PY2 else '_thread')
    assert not patched('threading')
    assert not patched('_threading_local')
    assert not patched('ssl')
    assert not patched('subprocess')
    assert not patched('sys')

139

140
@gpython_only
141
def test_executable(runtime):
142
    # sys.executable must point to gpython and we must be able to execute it.
143
    import gevent
144
    assert 'gpython' in sys.executable
145 146 147 148 149 150 151 152 153 154
    ver = pyout(['-c', 'import sys; print(sys.version)'], env=gpyenv(runtime))
    ver = str(ver)
    assert ('[GPython %s]' % golang.__version__) in ver
    if runtime != 'threads':
        assert ('[gevent %s]'  % gevent.__version__)     in ver
        assert ('[threads]')                         not in ver
    else:
        assert ('[gevent ')                          not in ver
        assert ('[threads]')                             in ver

155

156 157 158 159 160
# verify pymain.
#
# !gpython_only to make sure we get the same output when run via pymain (under
# gpython) and plain python (!gpython).
def test_pymain():
161
    from golang import b
162 163

    # interactive
164
    _ = pyout([], stdin=b'import hello\n', cwd=testdata)
165 166
    assert _ == b"hello\nworld\n['']\n"

167
    # -c <command>
168
    _ = pyout(['-c', 'import hello', 'abc', 'def'], cwd=testdata)
169
    assert _ == b"hello\nworld\n['-c', 'abc', 'def']\n"
170 171 172
    # -c<command> should also work
    __ = pyout(['-cimport hello', 'abc', 'def'], cwd=testdata)
    assert __ == _
173

174
    # -m <module>
175
    _ = pyout(['-m', 'hello', 'abc', 'def'], cwd=testdata)
176 177 178
    # realpath rewrites e.g. `local/lib -> lib` if local/lib is symlink
    hellopy = realpath(join(testdata, 'hello.py'))
    assert _ == b"hello\nworld\n['%s', 'abc', 'def']\n" % b(hellopy)
179 180 181
    # -m<module>
    __ = pyout(['-mhello', 'abc', 'def'], cwd=testdata)
    assert __ == _
182 183

    # file
184
    _ = pyout(['testdata/hello.py', 'abc', 'def'], cwd=here)
185
    assert _ == b"hello\nworld\n['testdata/hello.py', 'abc', 'def']\n"
186

Kirill Smelkov's avatar
Kirill Smelkov committed
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
    # -W <opt>
    _ = pyout(['-Werror', '-Whello', '-W', 'ignore::DeprecationWarning',
               'testprog/print_warnings_setup.py'], cwd=here)
    if PY2:
        # py2 threading, which is imported after gpython startup, adds ignore
        # for sys.exc_clear
        _ = grepv(r'ignore:sys.exc_clear:DeprecationWarning:threading:*', _)
    assert _.startswith(
        b"sys.warnoptions: ['error', 'hello', 'ignore::DeprecationWarning']\n\n" + \
        b"warnings.filters:\n" + \
        b"- ignore::DeprecationWarning::*\n" + \
        b"- error::Warning::*\n"), _
    # $PYTHONWARNINGS
    _ = pyout(['testprog/print_warnings_setup.py'], cwd=here,
              envadj={'PYTHONWARNINGS': 'ignore,world,error::SyntaxWarning'})
    if PY2:
        # see ^^^
        _ = grepv(r'ignore:sys.exc_clear:DeprecationWarning:threading:*', _)
    assert _.startswith(
        b"sys.warnoptions: ['ignore', 'world', 'error::SyntaxWarning']\n\n" + \
        b"warnings.filters:\n" + \
        b"- error::SyntaxWarning::*\n" + \
        b"- ignore::Warning::*\n"), _

211 212 213
# verify that pymain sets sys.path in exactly the same way as underlying python does.
@gpython_only
def test_pymain_syspath():
214 215 216 217
    from gpython import _is_buildout_script
    if _is_buildout_script(sys.executable):
        pytest.xfail("with buildout raw underlying interpreter does not have " +
                     "access to installed eggs")
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
    # check verifies that print_syspath output for gpython and underlying python is the same.
    # if path0cwd2realpath=Y, expect realpath('') instead of '' in sys.path[0]
    def check(argv, path0cwd2realpath=False, **kw):
        gpyout   = u(pyout(argv, **kw))
        stdpyout = u(pyout(argv, pyexe=sys._gpy_underlying_executable, **kw))
        gpyoutv   = gpyout.splitlines()
        stdpyoutv = stdpyout.splitlines()
        if path0cwd2realpath:
            assert stdpyoutv[0] == ''
            stdpyoutv[0] = realpath(kw.get('cwd', ''))

        assert gpyoutv == stdpyoutv

    check([], stdin=b'import print_syspath', cwd=testprog)  # interactive
    check(['-c', 'import print_syspath'], cwd=testprog)     # -c
    check(['-m', 'print_syspath'], cwd=testprog,            # -m
            path0cwd2realpath=(PY2 or is_pypy))
    check(['testprog/print_syspath.py'], cwd=here)          # file

237 238 239
# pymain -V/--version
# gpython_only because output differs from !gpython.
@gpython_only
240
def test_pymain_ver(runtime):
241 242 243
    from golang import b
    from gpython import _version_info_str as V
    import gevent
244 245 246 247 248
    vok = 'GPython %s' % golang.__version__
    if runtime != 'threads':
        vok += ' [gevent %s]' % gevent.__version__
    else:
        vok += ' [threads]'
249 250 251 252 253 254 255 256 257 258

    if is_cpython:
        vok += ' / CPython %s' % platform.python_version()
    elif is_pypy:
        vok += ' / PyPy %s / Python %s' % (V(sys.pypy_version_info), V(sys.version_info))
    else:
        vok = sys.version

    vok += '\n'

259
    ret, out, err = _pyrun(['-V'], stdout=PIPE, stderr=PIPE, env=gpyenv(runtime))
260 261
    assert (ret, out, b(err)) == (0, b'', b(vok))

262
    ret, out, err = _pyrun(['--version'], stdout=PIPE, stderr=PIPE, env=gpyenv(runtime))
263
    assert (ret, out, b(err)) == (0, b'', b(vok))
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284


# verify -X gpython.runtime=...
@gpython_only
def test_Xruntime(runtime):
    env = os.environ.copy()
    env.pop('GPYTHON_RUNTIME', None) # del

    argv = []
    if runtime != '':
        argv += ['-X', 'gpython.runtime='+runtime]
    prog = 'from gpython import gpython_test as t; '
    if runtime != 'threads':
        prog += 't.assert_gevent_activated(); '
    else:
        prog += 't.assert_gevent_not_activated(); '
    prog += 'print("ok")'
    argv += ['-c', prog]

    out = pyout(argv, env=env)
    assert out == b'ok\n'
Kirill Smelkov's avatar
Kirill Smelkov committed
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301


# ---- misc ----

# grepv filters out lines matching pattern from text.
def grepv(pattern, text): # -> text
    if isinstance(text, bytes):
        t = b''
    else:
        t = ''
    p = re.compile(pattern)
    v = []
    for l in text.splitlines(True):
        m = p.search(l)
        if not m:
            v.append(l)
    return t.join(v)