Commit 6a150bca authored by Eric Snow's avatar Eric Snow Committed by GitHub

bpo-33608: Factor out a private, per-interpreter _Py_AddPendingCall(). (gh-13714)

parent 218e47b6
...@@ -12,19 +12,22 @@ extern "C" { ...@@ -12,19 +12,22 @@ extern "C" {
#include "pycore_pystate.h" #include "pycore_pystate.h"
#include "pythread.h" #include "pythread.h"
PyAPI_FUNC(void) _Py_FinishPendingCalls(_PyRuntimeState *runtime);
PyAPI_FUNC(void) _PyEval_Initialize(struct _ceval_runtime_state *); PyAPI_FUNC(void) _PyEval_Initialize(struct _ceval_runtime_state *);
PyAPI_FUNC(void) _PyEval_FiniThreads( PyAPI_FUNC(void) _PyEval_FiniThreads(
struct _ceval_runtime_state *ceval); struct _ceval_runtime_state *);
PyAPI_FUNC(void) _PyEval_SignalReceived( PyAPI_FUNC(void) _PyEval_SignalReceived(
struct _ceval_runtime_state *ceval); struct _ceval_runtime_state *);
PyAPI_FUNC(int) _PyEval_AddPendingCall( PyAPI_FUNC(int) _PyEval_AddPendingCall(
PyThreadState *tstate, PyThreadState *tstate,
struct _ceval_runtime_state *ceval, struct _ceval_runtime_state *,
struct _ceval_interpreter_state *,
unsigned long thread_id,
int (*func)(void *), int (*func)(void *),
void *arg); void *arg);
PyAPI_FUNC(void) _PyEval_FinishPendingCalls(PyInterpreterState *);
PyAPI_FUNC(void) _PyEval_SignalAsyncExc( PyAPI_FUNC(void) _PyEval_SignalAsyncExc(
struct _ceval_runtime_state *ceval); struct _ceval_runtime_state *,
struct _ceval_interpreter_state *);
PyAPI_FUNC(void) _PyEval_ReInitThreads( PyAPI_FUNC(void) _PyEval_ReInitThreads(
_PyRuntimeState *runtime); _PyRuntimeState *runtime);
......
...@@ -25,7 +25,7 @@ struct pyruntimestate; ...@@ -25,7 +25,7 @@ struct pyruntimestate;
/* ceval state */ /* ceval state */
struct _pending_calls { struct _ceval_pending_calls {
int finishing; int finishing;
PyThread_type_lock lock; PyThread_type_lock lock;
/* Request for running pending calls. */ /* Request for running pending calls. */
...@@ -36,6 +36,7 @@ struct _pending_calls { ...@@ -36,6 +36,7 @@ struct _pending_calls {
int async_exc; int async_exc;
#define NPENDINGCALLS 32 #define NPENDINGCALLS 32
struct { struct {
unsigned long thread_id;
int (*func)(void *); int (*func)(void *);
void *arg; void *arg;
} calls[NPENDINGCALLS]; } calls[NPENDINGCALLS];
...@@ -53,15 +54,21 @@ struct _ceval_runtime_state { ...@@ -53,15 +54,21 @@ struct _ceval_runtime_state {
int tracing_possible; int tracing_possible;
/* This single variable consolidates all requests to break out of /* This single variable consolidates all requests to break out of
the fast path in the eval loop. */ the fast path in the eval loop. */
// XXX This can move to _ceval_interpreter_state once all parts
// from COMPUTE_EVAL_BREAKER have moved under PyInterpreterState.
_Py_atomic_int eval_breaker; _Py_atomic_int eval_breaker;
/* Request for dropping the GIL */ /* Request for dropping the GIL */
_Py_atomic_int gil_drop_request; _Py_atomic_int gil_drop_request;
struct _pending_calls pending;
/* Request for checking signals. */ /* Request for checking signals. */
_Py_atomic_int signals_pending; _Py_atomic_int signals_pending;
struct _gil_runtime_state gil; struct _gil_runtime_state gil;
}; };
struct _ceval_interpreter_state {
struct _ceval_pending_calls pending;
};
/* interpreter state */ /* interpreter state */
typedef PyObject* (*_PyFrameEvalFunction)(struct _frame *, int); typedef PyObject* (*_PyFrameEvalFunction)(struct _frame *, int);
...@@ -136,6 +143,7 @@ struct _is { ...@@ -136,6 +143,7 @@ struct _is {
uint64_t tstate_next_unique_id; uint64_t tstate_next_unique_id;
struct _ceval_interpreter_state ceval;
struct _warnings_runtime_state warnings; struct _warnings_runtime_state warnings;
PyObject *audit_hooks; PyObject *audit_hooks;
......
...@@ -431,7 +431,7 @@ class TestPendingCalls(unittest.TestCase): ...@@ -431,7 +431,7 @@ class TestPendingCalls(unittest.TestCase):
def test_pendingcalls_threaded(self): def test_pendingcalls_threaded(self):
#do every callback on a separate thread #do every callback on a separate thread
n = 32 #total callbacks n = 32 #total callbacks (see NPENDINGCALLS in pycore_ceval.h)
threads = [] threads = []
class foo(object):pass class foo(object):pass
context = foo() context = foo()
......
We added a new internal _Py_AddPendingCall() that operates relative to the
provided interpreter. This allows us to use the existing implementation to
ask another interpreter to do work that cannot be done in the current
interpreter, like decref an object the other interpreter owns. The existing
Py_AddPendingCall() only operates relative to the main interpreter.
...@@ -2677,6 +2677,7 @@ pending_threadfunc(PyObject *self, PyObject *arg) ...@@ -2677,6 +2677,7 @@ pending_threadfunc(PyObject *self, PyObject *arg)
Py_INCREF(callable); Py_INCREF(callable);
Py_BEGIN_ALLOW_THREADS Py_BEGIN_ALLOW_THREADS
/* XXX Use the internal _Py_AddPendingCall(). */
r = Py_AddPendingCall(&_pending_callback, callable); r = Py_AddPendingCall(&_pending_callback, callable);
Py_END_ALLOW_THREADS Py_END_ALLOW_THREADS
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include <process.h> #include <process.h>
#endif #endif
#endif #endif
#include "internal/pycore_pystate.h"
#ifdef HAVE_SIGNAL_H #ifdef HAVE_SIGNAL_H
#include <signal.h> #include <signal.h>
...@@ -259,6 +260,7 @@ trip_signal(int sig_num) ...@@ -259,6 +260,7 @@ trip_signal(int sig_num)
/* Notify ceval.c */ /* Notify ceval.c */
_PyRuntimeState *runtime = &_PyRuntime; _PyRuntimeState *runtime = &_PyRuntime;
PyThreadState *tstate = _PyRuntimeState_GetThreadState(runtime); PyThreadState *tstate = _PyRuntimeState_GetThreadState(runtime);
PyInterpreterState *interp = runtime->interpreters.main;
_PyEval_SignalReceived(&runtime->ceval); _PyEval_SignalReceived(&runtime->ceval);
/* And then write to the wakeup fd *after* setting all the globals and /* And then write to the wakeup fd *after* setting all the globals and
...@@ -299,7 +301,10 @@ trip_signal(int sig_num) ...@@ -299,7 +301,10 @@ trip_signal(int sig_num)
{ {
/* Py_AddPendingCall() isn't signal-safe, but we /* Py_AddPendingCall() isn't signal-safe, but we
still use it for this exceptional case. */ still use it for this exceptional case. */
_PyEval_AddPendingCall(tstate, &runtime->ceval, _PyEval_AddPendingCall(tstate,
&runtime->ceval,
&interp->ceval,
runtime->main_thread,
report_wakeup_send_error, report_wakeup_send_error,
(void *)(intptr_t) last_error); (void *)(intptr_t) last_error);
} }
...@@ -318,7 +323,10 @@ trip_signal(int sig_num) ...@@ -318,7 +323,10 @@ trip_signal(int sig_num)
{ {
/* Py_AddPendingCall() isn't signal-safe, but we /* Py_AddPendingCall() isn't signal-safe, but we
still use it for this exceptional case. */ still use it for this exceptional case. */
_PyEval_AddPendingCall(tstate, &runtime->ceval, _PyEval_AddPendingCall(tstate,
&runtime->ceval,
&interp->ceval,
runtime->main_thread,
report_wakeup_write_error, report_wakeup_write_error,
(void *)(intptr_t)errno); (void *)(intptr_t)errno);
} }
......
This diff is collapsed.
...@@ -141,9 +141,11 @@ static void recreate_gil(struct _gil_runtime_state *gil) ...@@ -141,9 +141,11 @@ static void recreate_gil(struct _gil_runtime_state *gil)
} }
static void static void
drop_gil(struct _ceval_runtime_state *ceval, PyThreadState *tstate) drop_gil(struct _ceval_runtime_state *ceval_r,
struct _ceval_interpreter_state *ceval_i,
PyThreadState *tstate)
{ {
struct _gil_runtime_state *gil = &ceval->gil; struct _gil_runtime_state *gil = &ceval_r->gil;
if (!_Py_atomic_load_relaxed(&gil->locked)) { if (!_Py_atomic_load_relaxed(&gil->locked)) {
Py_FatalError("drop_gil: GIL is not locked"); Py_FatalError("drop_gil: GIL is not locked");
} }
...@@ -163,12 +165,12 @@ drop_gil(struct _ceval_runtime_state *ceval, PyThreadState *tstate) ...@@ -163,12 +165,12 @@ drop_gil(struct _ceval_runtime_state *ceval, PyThreadState *tstate)
MUTEX_UNLOCK(gil->mutex); MUTEX_UNLOCK(gil->mutex);
#ifdef FORCE_SWITCHING #ifdef FORCE_SWITCHING
if (_Py_atomic_load_relaxed(&ceval->gil_drop_request) && tstate != NULL) { if (_Py_atomic_load_relaxed(&ceval_r->gil_drop_request) && tstate != NULL) {
MUTEX_LOCK(gil->switch_mutex); MUTEX_LOCK(gil->switch_mutex);
/* Not switched yet => wait */ /* Not switched yet => wait */
if (((PyThreadState*)_Py_atomic_load_relaxed(&gil->last_holder)) == tstate) if (((PyThreadState*)_Py_atomic_load_relaxed(&gil->last_holder)) == tstate)
{ {
RESET_GIL_DROP_REQUEST(ceval); RESET_GIL_DROP_REQUEST(ceval_r, ceval_i);
/* NOTE: if COND_WAIT does not atomically start waiting when /* NOTE: if COND_WAIT does not atomically start waiting when
releasing the mutex, another thread can run through, take releasing the mutex, another thread can run through, take
the GIL and drop it again, and reset the condition the GIL and drop it again, and reset the condition
...@@ -181,13 +183,19 @@ drop_gil(struct _ceval_runtime_state *ceval, PyThreadState *tstate) ...@@ -181,13 +183,19 @@ drop_gil(struct _ceval_runtime_state *ceval, PyThreadState *tstate)
} }
static void static void
take_gil(struct _ceval_runtime_state *ceval, PyThreadState *tstate) take_gil(struct _ceval_runtime_state *ceval_r,
PyThreadState *tstate)
{ {
if (tstate == NULL) { if (tstate == NULL) {
Py_FatalError("take_gil: NULL tstate"); Py_FatalError("take_gil: NULL tstate");
} }
PyInterpreterState *interp = tstate->interp;
if (interp == NULL) {
Py_FatalError("take_gil: NULL interp");
}
struct _ceval_interpreter_state *ceval_i = &interp->ceval;
struct _gil_runtime_state *gil = &ceval->gil; struct _gil_runtime_state *gil = &ceval_r->gil;
int err = errno; int err = errno;
MUTEX_LOCK(gil->mutex); MUTEX_LOCK(gil->mutex);
...@@ -210,7 +218,7 @@ take_gil(struct _ceval_runtime_state *ceval, PyThreadState *tstate) ...@@ -210,7 +218,7 @@ take_gil(struct _ceval_runtime_state *ceval, PyThreadState *tstate)
_Py_atomic_load_relaxed(&gil->locked) && _Py_atomic_load_relaxed(&gil->locked) &&
gil->switch_number == saved_switchnum) gil->switch_number == saved_switchnum)
{ {
SET_GIL_DROP_REQUEST(ceval); SET_GIL_DROP_REQUEST(ceval_r);
} }
} }
_ready: _ready:
...@@ -232,11 +240,11 @@ _ready: ...@@ -232,11 +240,11 @@ _ready:
COND_SIGNAL(gil->switch_cond); COND_SIGNAL(gil->switch_cond);
MUTEX_UNLOCK(gil->switch_mutex); MUTEX_UNLOCK(gil->switch_mutex);
#endif #endif
if (_Py_atomic_load_relaxed(&ceval->gil_drop_request)) { if (_Py_atomic_load_relaxed(&ceval_r->gil_drop_request)) {
RESET_GIL_DROP_REQUEST(ceval); RESET_GIL_DROP_REQUEST(ceval_r, ceval_i);
} }
if (tstate->async_exc != NULL) { if (tstate->async_exc != NULL) {
_PyEval_SignalAsyncExc(ceval); _PyEval_SignalAsyncExc(ceval_r, ceval_i);
} }
MUTEX_UNLOCK(gil->mutex); MUTEX_UNLOCK(gil->mutex);
......
...@@ -1147,15 +1147,31 @@ Py_FinalizeEx(void) ...@@ -1147,15 +1147,31 @@ Py_FinalizeEx(void)
return status; return status;
} }
/* Get current thread state and interpreter pointer */
PyThreadState *tstate = _PyRuntimeState_GetThreadState(runtime);
PyInterpreterState *interp = tstate->interp;
// Wrap up existing "threading"-module-created, non-daemon threads. // Wrap up existing "threading"-module-created, non-daemon threads.
wait_for_thread_shutdown(); wait_for_thread_shutdown();
// Make any remaining pending calls. // Make any remaining pending calls.
_Py_FinishPendingCalls(runtime); /* XXX For the moment we are going to ignore lingering pending calls.
* We've seen sporadic on some of the buildbots during finalization
/* Get current thread state and interpreter pointer */ * with the changes for per-interpreter pending calls (see bpo-33608),
PyThreadState *tstate = _PyRuntimeState_GetThreadState(runtime); * meaning the previous _PyEval_FinishPendincCalls() call here is
PyInterpreterState *interp = tstate->interp; * a trigger, if not responsible.
*
* Ignoring pending calls at this point in the runtime lifecycle
* is okay (for now) for the following reasons:
*
* * pending calls are still not a widely-used feature
* * this only affects runtime finalization, where the process is
* likely to end soon anyway (except for some embdding cases)
*
* See bpo-37127 about resolving the problem. Ultimately the call
* here should be re-enabled.
*/
//_PyEval_FinishPendingCalls(interp);
/* The interpreter is still entirely intact at this point, and the /* The interpreter is still entirely intact at this point, and the
* exit funcs may be relying on that. In particular, if some thread * exit funcs may be relying on that. In particular, if some thread
...@@ -1580,6 +1596,9 @@ Py_EndInterpreter(PyThreadState *tstate) ...@@ -1580,6 +1596,9 @@ Py_EndInterpreter(PyThreadState *tstate)
// Wrap up existing "threading"-module-created, non-daemon threads. // Wrap up existing "threading"-module-created, non-daemon threads.
wait_for_thread_shutdown(); wait_for_thread_shutdown();
// Make any remaining pending calls.
_PyEval_FinishPendingCalls(interp);
call_py_exitfuncs(interp); call_py_exitfuncs(interp);
if (tstate != interp->tstate_head || tstate->next != NULL) if (tstate != interp->tstate_head || tstate->next != NULL)
......
...@@ -218,6 +218,13 @@ PyInterpreterState_New(void) ...@@ -218,6 +218,13 @@ PyInterpreterState_New(void)
return NULL; return NULL;
} }
interp->ceval.pending.lock = PyThread_allocate_lock();
if (interp->ceval.pending.lock == NULL) {
PyErr_SetString(PyExc_RuntimeError,
"failed to create interpreter ceval pending mutex");
return NULL;
}
interp->eval_frame = _PyEval_EvalFrameDefault; interp->eval_frame = _PyEval_EvalFrameDefault;
#ifdef HAVE_DLOPEN #ifdef HAVE_DLOPEN
#if HAVE_DECL_RTLD_NOW #if HAVE_DECL_RTLD_NOW
...@@ -345,6 +352,10 @@ PyInterpreterState_Delete(PyInterpreterState *interp) ...@@ -345,6 +352,10 @@ PyInterpreterState_Delete(PyInterpreterState *interp)
if (interp->id_mutex != NULL) { if (interp->id_mutex != NULL) {
PyThread_free_lock(interp->id_mutex); PyThread_free_lock(interp->id_mutex);
} }
if (interp->ceval.pending.lock != NULL) {
PyThread_free_lock(interp->ceval.pending.lock);
interp->ceval.pending.lock = NULL;
}
PyMem_RawFree(interp); PyMem_RawFree(interp);
} }
...@@ -1014,7 +1025,7 @@ PyThreadState_SetAsyncExc(unsigned long id, PyObject *exc) ...@@ -1014,7 +1025,7 @@ PyThreadState_SetAsyncExc(unsigned long id, PyObject *exc)
p->async_exc = exc; p->async_exc = exc;
HEAD_UNLOCK(runtime); HEAD_UNLOCK(runtime);
Py_XDECREF(old_exc); Py_XDECREF(old_exc);
_PyEval_SignalAsyncExc(&runtime->ceval); _PyEval_SignalAsyncExc(&runtime->ceval, &interp->ceval);
return 1; return 1;
} }
} }
...@@ -1444,7 +1455,7 @@ _PyObject_GetCrossInterpreterData(PyObject *obj, _PyCrossInterpreterData *data) ...@@ -1444,7 +1455,7 @@ _PyObject_GetCrossInterpreterData(PyObject *obj, _PyCrossInterpreterData *data)
return 0; return 0;
} }
static void static int
_release_xidata(void *arg) _release_xidata(void *arg)
{ {
_PyCrossInterpreterData *data = (_PyCrossInterpreterData *)arg; _PyCrossInterpreterData *data = (_PyCrossInterpreterData *)arg;
...@@ -1452,42 +1463,21 @@ _release_xidata(void *arg) ...@@ -1452,42 +1463,21 @@ _release_xidata(void *arg)
data->free(data->data); data->free(data->data);
} }
Py_XDECREF(data->obj); Py_XDECREF(data->obj);
} PyMem_Free(data);
return 0;
static void
_call_in_interpreter(struct _gilstate_runtime_state *gilstate,
PyInterpreterState *interp,
void (*func)(void *), void *arg)
{
/* We would use Py_AddPendingCall() if it weren't specific to the
* main interpreter (see bpo-33608). In the meantime we take a
* naive approach.
*/
PyThreadState *save_tstate = NULL;
if (interp != _PyRuntimeGILState_GetThreadState(gilstate)->interp) {
// XXX Using the "head" thread isn't strictly correct.
PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
// XXX Possible GILState issues?
save_tstate = _PyThreadState_Swap(gilstate, tstate);
}
func(arg);
// Switch back.
if (save_tstate != NULL) {
_PyThreadState_Swap(gilstate, save_tstate);
}
} }
void void
_PyCrossInterpreterData_Release(_PyCrossInterpreterData *data) _PyCrossInterpreterData_Release(_PyCrossInterpreterData *data)
{ {
_PyRuntimeState *runtime = &_PyRuntime;
if (data->data == NULL && data->obj == NULL) { if (data->data == NULL && data->obj == NULL) {
// Nothing to release! // Nothing to release!
return; return;
} }
// Switch to the original interpreter. // Get the original interpreter.
PyInterpreterState *interp = _PyInterpreterState_LookUpID(data->interp); PyInterpreterState *interp = _PyInterpreterState_LookUpID(data->interp);
if (interp == NULL) { if (interp == NULL) {
// The intepreter was already destroyed. // The intepreter was already destroyed.
...@@ -1496,10 +1486,28 @@ _PyCrossInterpreterData_Release(_PyCrossInterpreterData *data) ...@@ -1496,10 +1486,28 @@ _PyCrossInterpreterData_Release(_PyCrossInterpreterData *data)
} }
return; return;
} }
// XXX There's an ever-so-slight race here...
if (interp->finalizing) {
// XXX Someone leaked some memory...
return;
}
// "Release" the data and/or the object. // "Release" the data and/or the object.
struct _gilstate_runtime_state *gilstate = &_PyRuntime.gilstate; _PyCrossInterpreterData *copied = PyMem_Malloc(sizeof(_PyCrossInterpreterData));
_call_in_interpreter(gilstate, interp, _release_xidata, data); if (copied == NULL) {
PyErr_SetString(PyExc_MemoryError,
"Not enough memory to preserve cross-interpreter data");
PyErr_Print();
return;
}
memcpy(copied, data, sizeof(_PyCrossInterpreterData));
PyThreadState *tstate = _PyRuntimeState_GetThreadState(runtime);
int res = _PyEval_AddPendingCall(tstate,
&runtime->ceval, &interp->ceval,
0, _release_xidata, copied);
if (res != 0) {
// XXX Queue full or couldn't get lock. Try again somehow?
}
} }
PyObject * PyObject *
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment