Skip to content

gh-93649: Split watcher API tests from _testcapimodule.c #99532

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Modules/_testcapi/parts.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ int _PyTestCapi_Init_PyTime(PyObject *module);
int _PyTestCapi_Init_DateTime(PyObject *module);
int _PyTestCapi_Init_Docstring(PyObject *module);
int _PyTestCapi_Init_Mem(PyObject *module);
int _PyTestCapi_Init_Watchers(PyObject *module);

#ifdef LIMITED_API_AVAILABLE
int _PyTestCapi_Init_VectorcallLimited(PyObject *module);
Expand Down
302 changes: 302 additions & 0 deletions Modules/_testcapi/watchers.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,302 @@
#include "parts.h"


// Test dict watching
static PyObject *g_dict_watch_events;
static int g_dict_watchers_installed;

static int
dict_watch_callback(PyDict_WatchEvent event,
PyObject *dict,
PyObject *key,
PyObject *new_value)
{
PyObject *msg;
switch (event) {
case PyDict_EVENT_CLEARED:
msg = PyUnicode_FromString("clear");
break;
case PyDict_EVENT_DEALLOCATED:
msg = PyUnicode_FromString("dealloc");
break;
case PyDict_EVENT_CLONED:
msg = PyUnicode_FromString("clone");
break;
case PyDict_EVENT_ADDED:
msg = PyUnicode_FromFormat("new:%S:%S", key, new_value);
break;
case PyDict_EVENT_MODIFIED:
msg = PyUnicode_FromFormat("mod:%S:%S", key, new_value);
break;
case PyDict_EVENT_DELETED:
msg = PyUnicode_FromFormat("del:%S", key);
break;
default:
msg = PyUnicode_FromString("unknown");
}
if (msg == NULL) {
return -1;
}
assert(PyList_Check(g_dict_watch_events));
if (PyList_Append(g_dict_watch_events, msg) < 0) {
Py_DECREF(msg);
return -1;
}
Py_DECREF(msg);
return 0;
}

static int
dict_watch_callback_second(PyDict_WatchEvent event,
PyObject *dict,
PyObject *key,
PyObject *new_value)
{
PyObject *msg = PyUnicode_FromString("second");
if (msg == NULL) {
return -1;
}
int rc = PyList_Append(g_dict_watch_events, msg);
Py_DECREF(msg);
if (rc < 0) {
return -1;
}
return 0;
}

static int
dict_watch_callback_error(PyDict_WatchEvent event,
PyObject *dict,
PyObject *key,
PyObject *new_value)
{
PyErr_SetString(PyExc_RuntimeError, "boom!");
return -1;
}

static PyObject *
add_dict_watcher(PyObject *self, PyObject *kind)
{
int watcher_id;
assert(PyLong_Check(kind));
long kind_l = PyLong_AsLong(kind);
if (kind_l == 2) {
watcher_id = PyDict_AddWatcher(dict_watch_callback_second);
}
else if (kind_l == 1) {
watcher_id = PyDict_AddWatcher(dict_watch_callback_error);
}
else {
watcher_id = PyDict_AddWatcher(dict_watch_callback);
}
if (watcher_id < 0) {
return NULL;
}
if (!g_dict_watchers_installed) {
assert(!g_dict_watch_events);
if (!(g_dict_watch_events = PyList_New(0))) {
return NULL;
}
}
g_dict_watchers_installed++;
return PyLong_FromLong(watcher_id);
}

static PyObject *
clear_dict_watcher(PyObject *self, PyObject *watcher_id)
{
if (PyDict_ClearWatcher(PyLong_AsLong(watcher_id))) {
return NULL;
}
g_dict_watchers_installed--;
if (!g_dict_watchers_installed) {
assert(g_dict_watch_events);
Py_CLEAR(g_dict_watch_events);
}
Py_RETURN_NONE;
}

static PyObject *
watch_dict(PyObject *self, PyObject *args)
{
PyObject *dict;
int watcher_id;
if (!PyArg_ParseTuple(args, "iO", &watcher_id, &dict)) {
return NULL;
}
if (PyDict_Watch(watcher_id, dict)) {
return NULL;
}
Py_RETURN_NONE;
}

static PyObject *
unwatch_dict(PyObject *self, PyObject *args)
{
PyObject *dict;
int watcher_id;
if (!PyArg_ParseTuple(args, "iO", &watcher_id, &dict)) {
return NULL;
}
if (PyDict_Unwatch(watcher_id, dict)) {
return NULL;
}
Py_RETURN_NONE;
}

static PyObject *
get_dict_watcher_events(PyObject *self, PyObject *Py_UNUSED(args))
{
if (!g_dict_watch_events) {
PyErr_SetString(PyExc_RuntimeError, "no watchers active");
return NULL;
}
return Py_NewRef(g_dict_watch_events);
}

// Test type watchers
static PyObject *g_type_modified_events;
static int g_type_watchers_installed;

static int
type_modified_callback(PyTypeObject *type)
{
assert(PyList_Check(g_type_modified_events));
if(PyList_Append(g_type_modified_events, (PyObject *)type) < 0) {
return -1;
}
return 0;
}

static int
type_modified_callback_wrap(PyTypeObject *type)
{
assert(PyList_Check(g_type_modified_events));
PyObject *list = PyList_New(0);
if (list == NULL) {
return -1;
}
if (PyList_Append(list, (PyObject *)type) < 0) {
Py_DECREF(list);
return -1;
}
if (PyList_Append(g_type_modified_events, list) < 0) {
Py_DECREF(list);
return -1;
}
Py_DECREF(list);
return 0;
}

static int
type_modified_callback_error(PyTypeObject *type)
{
PyErr_SetString(PyExc_RuntimeError, "boom!");
return -1;
}

static PyObject *
add_type_watcher(PyObject *self, PyObject *kind)
{
int watcher_id;
assert(PyLong_Check(kind));
long kind_l = PyLong_AsLong(kind);
if (kind_l == 2) {
watcher_id = PyType_AddWatcher(type_modified_callback_wrap);
}
else if (kind_l == 1) {
watcher_id = PyType_AddWatcher(type_modified_callback_error);
}
else {
watcher_id = PyType_AddWatcher(type_modified_callback);
}
if (watcher_id < 0) {
return NULL;
}
if (!g_type_watchers_installed) {
assert(!g_type_modified_events);
if (!(g_type_modified_events = PyList_New(0))) {
return NULL;
}
}
g_type_watchers_installed++;
return PyLong_FromLong(watcher_id);
}

static PyObject *
clear_type_watcher(PyObject *self, PyObject *watcher_id)
{
if (PyType_ClearWatcher(PyLong_AsLong(watcher_id))) {
return NULL;
}
g_type_watchers_installed--;
if (!g_type_watchers_installed) {
assert(g_type_modified_events);
Py_CLEAR(g_type_modified_events);
}
Py_RETURN_NONE;
}

static PyObject *
get_type_modified_events(PyObject *self, PyObject *Py_UNUSED(args))
{
if (!g_type_modified_events) {
PyErr_SetString(PyExc_RuntimeError, "no watchers active");
return NULL;
}
return Py_NewRef(g_type_modified_events);
}

static PyObject *
watch_type(PyObject *self, PyObject *args)
{
PyObject *type;
int watcher_id;
if (!PyArg_ParseTuple(args, "iO", &watcher_id, &type)) {
return NULL;
}
if (PyType_Watch(watcher_id, type)) {
return NULL;
}
Py_RETURN_NONE;
}

static PyObject *
unwatch_type(PyObject *self, PyObject *args)
{
PyObject *type;
int watcher_id;
if (!PyArg_ParseTuple(args, "iO", &watcher_id, &type)) {
return NULL;
}
if (PyType_Unwatch(watcher_id, type)) {
return NULL;
}
Py_RETURN_NONE;
}

static PyMethodDef test_methods[] = {
// Dict watchers.
{"add_dict_watcher", add_dict_watcher, METH_O, NULL},
{"clear_dict_watcher", clear_dict_watcher, METH_O, NULL},
{"watch_dict", watch_dict, METH_VARARGS, NULL},
{"unwatch_dict", unwatch_dict, METH_VARARGS, NULL},
{"get_dict_watcher_events", get_dict_watcher_events, METH_NOARGS, NULL},

// Type watchers.
{"add_type_watcher", add_type_watcher, METH_O, NULL},
{"clear_type_watcher", clear_type_watcher, METH_O, NULL},
{"watch_type", watch_type, METH_VARARGS, NULL},
{"unwatch_type", unwatch_type, METH_VARARGS, NULL},
{"get_type_modified_events", get_type_modified_events, METH_NOARGS, NULL},
{NULL},
};

int
_PyTestCapi_Init_Watchers(PyObject *mod)
{
if (PyModule_AddFunctions(mod, test_methods) < 0) {
return -1;
}
return 0;
}
Loading