diff --git a/sentry_sdk/_types.py b/sentry_sdk/_types.py index 95e4ac3ba3..b955797dd4 100644 --- a/sentry_sdk/_types.py +++ b/sentry_sdk/_types.py @@ -42,3 +42,5 @@ ] SessionStatus = Literal["ok", "exited", "crashed", "abnormal"] EndpointType = Literal["store", "envelope"] + + FlushCallback = Callable[[int, float], None] diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index 59185c579a..aeb6449ea2 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -12,6 +12,7 @@ from typing_extensions import TypedDict from sentry_sdk.transport import Transport + from sentry_sdk.worker import Worker from sentry_sdk.integrations import Integration from sentry_sdk._types import ( @@ -57,6 +58,7 @@ def __init__( default_integrations=True, # type: bool dist=None, # type: Optional[str] transport=None, # type: Optional[Union[Transport, Type[Transport], Callable[[Event], None]]] + worker=None, # type: Optional[Type[Worker]] sample_rate=1.0, # type: float send_default_pii=False, # type: bool http_proxy=None, # type: Optional[str] diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index 47d9ff6e35..73d26deb71 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -8,7 +8,7 @@ from datetime import datetime, timedelta from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions, json_dumps -from sentry_sdk.worker import BackgroundWorker +from sentry_sdk.worker import make_worker from sentry_sdk.envelope import Envelope from sentry_sdk._types import MYPY @@ -126,7 +126,11 @@ def __init__( Transport.__init__(self, options) assert self.parsed_dsn is not None - self._worker = BackgroundWorker() + + worker = make_worker(options) + assert worker is not None + + self._worker = worker self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION) self._disabled_until = {} # type: Dict[DataCategory, datetime] self._retry = urllib3.util.Retry() diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker/__init__.py similarity index 68% rename from sentry_sdk/worker.py rename to sentry_sdk/worker/__init__.py index 8550f1081c..f953d4edb0 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker/__init__.py @@ -9,15 +9,51 @@ from sentry_sdk._types import MYPY if MYPY: - from typing import Any from typing import Optional from typing import Callable + from typing import Dict + from typing import Any + + from sentry_sdk._types import FlushCallback _TERMINATOR = object() -class BackgroundWorker(object): +class Worker(object): + """ + Base interface for task queue based worker. + """ + + @property + def is_alive(self): + # type: () -> bool + return False + + def start(self): + # type: () -> None + """Start task queue processing.""" + raise NotImplementedError() + + def kill(self): + # type: () -> None + """ + Stop task queue processing. Returns immediately. Not useful + for waiting on shutdown for events, use `flush` for that. + """ + + def flush(self, timeout, callback=None): + # type: (float, Optional[FlushCallback]) -> None + """Wait `timeout` seconds for the current tasks to be finished.""" + pass + + def submit(self, callback): + # type: (Callable[[], None]) -> None + """Submit new task to the queue""" + raise NotImplementedError() + + +class BackgroundWorker(Worker): def __init__(self, queue_size=30): # type: (int) -> None check_thread_support() @@ -63,7 +99,7 @@ def start(self): with self._lock: if not self.is_alive: self._thread = threading.Thread( - target=self._target, name="raven-sentry.BackgroundWorker" + target=self._target, name="sentry_sdk.BackgroundWorker" ) self._thread.setDaemon(True) self._thread.start() @@ -71,6 +107,8 @@ def start(self): def kill(self): # type: () -> None + # FIXME: This doesn't actually do what it claims. Putting the terminator + # on the queue means it will execute all the other tasks first. """ Kill worker thread. Returns immediately. Not useful for waiting on shutdown for events, use `flush` for that. @@ -87,7 +125,7 @@ def kill(self): self._thread_for_pid = None def flush(self, timeout, callback=None): - # type: (float, Optional[Any]) -> None + # type: (float, Optional[FlushCallback]) -> None logger.debug("background worker got flush request") with self._lock: if self.is_alive and timeout > 0.0: @@ -95,7 +133,7 @@ def flush(self, timeout, callback=None): logger.debug("background worker flushed") def _wait_flush(self, timeout, callback): - # type: (float, Optional[Any]) -> None + # type: (float, Optional[FlushCallback]) -> None initial_timeout = min(0.1, timeout) if not self._timed_queue_join(initial_timeout): pending = self._queue.qsize() @@ -126,3 +164,18 @@ def _target(self): finally: self._queue.task_done() sleep(0) + + +def make_worker(options): + # type: (Dict[str, Any]) -> Optional[Worker] + ref_worker = options["worker"] + + # If no worker is given, we use the background worker class + if ref_worker is None: + return BackgroundWorker() + elif isinstance(ref_worker, Worker): + return ref_worker + elif isinstance(ref_worker, type) and issubclass(ref_worker, Worker): + return ref_worker() + else: + return None diff --git a/sentry_sdk/worker/gevent_worker.py b/sentry_sdk/worker/gevent_worker.py new file mode 100644 index 0000000000..d8c2da0176 --- /dev/null +++ b/sentry_sdk/worker/gevent_worker.py @@ -0,0 +1,114 @@ +from sentry_sdk.integrations import DidNotEnable +from sentry_sdk.worker import Worker +from sentry_sdk.utils import logger + +try: + import gevent # type: ignore + + from gevent.event import Event # type: ignore + from gevent.lock import RLock # type: ignore + from gevent.queue import Empty, Full, JoinableQueue # type: ignore +except ImportError: + raise DidNotEnable("gevent not installed") + +from sentry_sdk._types import MYPY + +if MYPY: + from typing import Optional + from typing import Callable + + from sentry_sdk._types import FlushCallback + + +class GeventWorker(Worker): + def __init__(self, queue_size=30): + # type: (int) -> None + self._queue_size = queue_size + self._queue = JoinableQueue(self._queue_size) + self._kill_event = Event() + self._lock = RLock() + self._greenlet = None + + @property + def is_alive(self): + # type: () -> bool + if not self._greenlet: + return False + return not self._greenlet.dead + + def _ensure_greenlet(self): + # type: () -> None + if not self.is_alive: + self.start() + + def start(self): + # type: () -> None + with self._lock: + if not self.is_alive: + self._greenlet = gevent.spawn(_task_loop, self._queue, self._kill_event) + + def kill(self): + # type: () -> None + logger.debug("gevent worker got kill request") + with self._lock: + if self._greenlet: + self._kill_event.set() + self._greenlet.join(timeout=0.5) + if not self._greenlet.dead: + # Forcibly kill greenlet + logger.warning("gevent worker failed to terminate gracefully.") + self._greenlet.kill(block=False) + self._greenlet = None + self._queue = JoinableQueue(self._queue_size) + self._kill_event = Event() + + def flush(self, timeout, callback=None): + # type: (float, Optional[FlushCallback]) -> None + logger.debug("gevent worker got flush request") + with self._lock: + if self.is_alive and timeout > 0.0: + self._wait_flush(timeout, callback) + logger.debug("gevent worker flushed") + + def _wait_flush(self, timeout, callback): + # type: (float, Optional[FlushCallback]) -> None + initial_timeout = min(0.1, timeout) + if not self._queue.join(initial_timeout): + pending = self._queue.qsize() + logger.debug("%d event(s) pending on flush", pending) + if callback is not None: + callback(pending, timeout) + self._queue.join(timeout - initial_timeout) + + def submit(self, callback): + # type: (Callable[[], None]) -> None + self._ensure_greenlet() + try: + self._queue.put_nowait(callback) + except Full: + logger.debug("gevent worker queue full, dropping event") + + +def _task_loop(queue, kill_event): + # type: (JoinableQueue, Event) -> None + while True: + try: + callback = queue.get(timeout=0.1) + if kill_event.is_set(): + # NOTE: We want to kill before executing the task, but we also need + # to be able to kill on an empty queue, so we raise Empty to + # avoid code duplciation + raise Empty + try: + callback() + except Exception: + logger.error("Failed processing job", exc_info=True) + finally: + queue.task_done() + except Empty: + pass + + if kill_event.is_set(): + logger.debug("gevent worker killed") + break + gevent.sleep(0) diff --git a/setup.py b/setup.py index 59aef3600c..44e5a70827 100644 --- a/setup.py +++ b/setup.py @@ -18,7 +18,7 @@ def get_file_text(file_name): with open(os.path.join(here, file_name)) as in_file: return in_file.read() - + setup( name="sentry-sdk", version="0.19.4", @@ -53,6 +53,7 @@ def get_file_text(file_name): "pyspark": ["pyspark>=2.4.4"], "pure_eval": ["pure_eval", "executing", "asttokens"], "chalice": ["chalice>=1.16.0"], + "gevent": ["gevent>=1.4.0"], }, classifiers=[ "Development Status :: 5 - Production/Stable", diff --git a/tests/test_transport.py b/tests/test_transport.py index 96145eb951..2baa11734b 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -9,6 +9,7 @@ from sentry_sdk import Hub, Client, add_breadcrumb, capture_message from sentry_sdk.transport import _parse_rate_limits from sentry_sdk.integrations.logging import LoggingIntegration +from sentry_sdk.worker.gevent_worker import GeventWorker @pytest.fixture @@ -25,6 +26,7 @@ def inner(**kwargs): @pytest.mark.parametrize("debug", (True, False)) @pytest.mark.parametrize("client_flush_method", ["close", "flush"]) @pytest.mark.parametrize("use_pickle", (True, False)) +@pytest.mark.parametrize("worker", (None, GeventWorker)) def test_transport_works( httpserver, request, @@ -34,11 +36,16 @@ def test_transport_works( make_client, client_flush_method, use_pickle, + worker, maybe_monkeypatched_threading, ): + if worker == GeventWorker: + if maybe_monkeypatched_threading != "gevent": + pytest.skip("GeventWorker requires gevent.monkey") + httpserver.serve_content("ok", 200) caplog.set_level(logging.DEBUG) - client = make_client(debug=debug) + client = make_client(debug=debug, worker=worker) if use_pickle: client = pickle.loads(pickle.dumps(client)) diff --git a/tests/worker/__init__.py b/tests/worker/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/worker/test_gevent.py b/tests/worker/test_gevent.py new file mode 100644 index 0000000000..4737d01ed8 --- /dev/null +++ b/tests/worker/test_gevent.py @@ -0,0 +1,87 @@ +import gevent +import logging + +from sentry_sdk.worker.gevent_worker import GeventWorker + +try: + from unittest import mock # python 3.3 and above +except ImportError: + import mock # python < 3.3 + + +SKIP_REASON = "gevent missing or broken" + + +def test_start(): + worker = GeventWorker() + worker.start() + assert worker.is_alive + assert worker._greenlet is not None + assert not worker._greenlet.dead + worker.kill() + + +def test_kill(): + worker = GeventWorker() + worker.start() + + greenlet = worker._greenlet + worker.kill() + assert not worker.is_alive + assert worker._greenlet is None + assert greenlet.dead + + +def test_kill_stalling_task(sentry_init, caplog): + sentry_init(debug=True) + worker = GeventWorker() + + task = mock.Mock(side_effect=lambda: gevent.sleep(2)) + worker.submit(task) + assert worker._queue.qsize() == 1 + + gevent.sleep(0) + assert task.called + + worker.kill() + assert not worker.is_alive + assert worker._greenlet is None + assert "gevent worker failed to terminate" in caplog.text + + +def test_submit(sentry_init, caplog): + sentry_init(debug=True) + worker = GeventWorker(queue_size=1) + assert worker._queue.qsize() == 0 + task_1 = mock.Mock() + + worker.submit(task_1) + assert worker._queue.qsize() == 1 + + with caplog.at_level(logging.DEBUG): + task_2 = mock.Mock() + worker.submit(task_2) + assert worker._queue.qsize() == 1 + assert "gevent worker queue full" in caplog.text + + worker.kill() + + +def test_flush(): + worker = GeventWorker() + + task_1 = mock.Mock() + worker.submit(task_1) + + task_2 = mock.Mock() + worker.submit(task_2) + assert worker._queue.qsize() == 2 + assert not task_1.called + assert not task_2.called + + worker.flush(0.01) + assert worker._queue.qsize() == 0 + assert task_1.called + assert task_2.called + + worker.kill()