Skip to content

Adds gevent based worker as alternative. #943

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sentry_sdk/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,5 @@
]
SessionStatus = Literal["ok", "exited", "crashed", "abnormal"]
EndpointType = Literal["store", "envelope"]

FlushCallback = Callable[[int, float], None]
2 changes: 2 additions & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from typing_extensions import TypedDict

from sentry_sdk.transport import Transport
from sentry_sdk.worker import Worker
from sentry_sdk.integrations import Integration

from sentry_sdk._types import (
Expand Down Expand Up @@ -57,6 +58,7 @@ def __init__(
default_integrations=True, # type: bool
dist=None, # type: Optional[str]
transport=None, # type: Optional[Union[Transport, Type[Transport], Callable[[Event], None]]]
worker=None, # type: Optional[Type[Worker]]
sample_rate=1.0, # type: float
send_default_pii=False, # type: bool
http_proxy=None, # type: Optional[str]
Expand Down
8 changes: 6 additions & 2 deletions sentry_sdk/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from datetime import datetime, timedelta

from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions, json_dumps
from sentry_sdk.worker import BackgroundWorker
from sentry_sdk.worker import make_worker
from sentry_sdk.envelope import Envelope

from sentry_sdk._types import MYPY
Expand Down Expand Up @@ -126,7 +126,11 @@ def __init__(

Transport.__init__(self, options)
assert self.parsed_dsn is not None
self._worker = BackgroundWorker()

worker = make_worker(options)
assert worker is not None

self._worker = worker
self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION)
self._disabled_until = {} # type: Dict[DataCategory, datetime]
self._retry = urllib3.util.Retry()
Expand Down
63 changes: 58 additions & 5 deletions sentry_sdk/worker.py → sentry_sdk/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,51 @@
from sentry_sdk._types import MYPY

if MYPY:
from typing import Any
from typing import Optional
from typing import Callable
from typing import Dict
from typing import Any

from sentry_sdk._types import FlushCallback


_TERMINATOR = object()


class BackgroundWorker(object):
class Worker(object):
"""
Base interface for task queue based worker.
"""

@property
def is_alive(self):
# type: () -> bool
return False

def start(self):
# type: () -> None
"""Start task queue processing."""
raise NotImplementedError()

def kill(self):
# type: () -> None
"""
Stop task queue processing. Returns immediately. Not useful
for waiting on shutdown for events, use `flush` for that.
"""

def flush(self, timeout, callback=None):
# type: (float, Optional[FlushCallback]) -> None
"""Wait `timeout` seconds for the current tasks to be finished."""
pass

def submit(self, callback):
# type: (Callable[[], None]) -> None
"""Submit new task to the queue"""
raise NotImplementedError()


class BackgroundWorker(Worker):
def __init__(self, queue_size=30):
# type: (int) -> None
check_thread_support()
Expand Down Expand Up @@ -63,14 +99,16 @@ def start(self):
with self._lock:
if not self.is_alive:
self._thread = threading.Thread(
target=self._target, name="raven-sentry.BackgroundWorker"
target=self._target, name="sentry_sdk.BackgroundWorker"
)
self._thread.setDaemon(True)
self._thread.start()
self._thread_for_pid = os.getpid()

def kill(self):
# type: () -> None
# FIXME: This doesn't actually do what it claims. Putting the terminator
# on the queue means it will execute all the other tasks first.
"""
Kill worker thread. Returns immediately. Not useful for
waiting on shutdown for events, use `flush` for that.
Expand All @@ -87,15 +125,15 @@ def kill(self):
self._thread_for_pid = None

def flush(self, timeout, callback=None):
# type: (float, Optional[Any]) -> None
# type: (float, Optional[FlushCallback]) -> None
logger.debug("background worker got flush request")
with self._lock:
if self.is_alive and timeout > 0.0:
self._wait_flush(timeout, callback)
logger.debug("background worker flushed")

def _wait_flush(self, timeout, callback):
# type: (float, Optional[Any]) -> None
# type: (float, Optional[FlushCallback]) -> None
initial_timeout = min(0.1, timeout)
if not self._timed_queue_join(initial_timeout):
pending = self._queue.qsize()
Expand Down Expand Up @@ -126,3 +164,18 @@ def _target(self):
finally:
self._queue.task_done()
sleep(0)


def make_worker(options):
# type: (Dict[str, Any]) -> Optional[Worker]
ref_worker = options["worker"]

# If no worker is given, we use the background worker class
if ref_worker is None:
return BackgroundWorker()
elif isinstance(ref_worker, Worker):
return ref_worker
elif isinstance(ref_worker, type) and issubclass(ref_worker, Worker):
return ref_worker()
else:
return None
114 changes: 114 additions & 0 deletions sentry_sdk/worker/gevent_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from sentry_sdk.integrations import DidNotEnable
from sentry_sdk.worker import Worker
from sentry_sdk.utils import logger

try:
import gevent # type: ignore

from gevent.event import Event # type: ignore
from gevent.lock import RLock # type: ignore
from gevent.queue import Empty, Full, JoinableQueue # type: ignore
except ImportError:
raise DidNotEnable("gevent not installed")

from sentry_sdk._types import MYPY

if MYPY:
from typing import Optional
from typing import Callable

from sentry_sdk._types import FlushCallback


class GeventWorker(Worker):
def __init__(self, queue_size=30):
# type: (int) -> None
self._queue_size = queue_size
self._queue = JoinableQueue(self._queue_size)
self._kill_event = Event()
self._lock = RLock()
self._greenlet = None

@property
def is_alive(self):
# type: () -> bool
if not self._greenlet:
return False
return not self._greenlet.dead

def _ensure_greenlet(self):
# type: () -> None
if not self.is_alive:
self.start()

def start(self):
# type: () -> None
with self._lock:
if not self.is_alive:
self._greenlet = gevent.spawn(_task_loop, self._queue, self._kill_event)

def kill(self):
# type: () -> None
logger.debug("gevent worker got kill request")
with self._lock:
if self._greenlet:
self._kill_event.set()
self._greenlet.join(timeout=0.5)
if not self._greenlet.dead:
# Forcibly kill greenlet
logger.warning("gevent worker failed to terminate gracefully.")
self._greenlet.kill(block=False)
self._greenlet = None
self._queue = JoinableQueue(self._queue_size)
self._kill_event = Event()

def flush(self, timeout, callback=None):
# type: (float, Optional[FlushCallback]) -> None
logger.debug("gevent worker got flush request")
with self._lock:
if self.is_alive and timeout > 0.0:
self._wait_flush(timeout, callback)
logger.debug("gevent worker flushed")

def _wait_flush(self, timeout, callback):
# type: (float, Optional[FlushCallback]) -> None
initial_timeout = min(0.1, timeout)
if not self._queue.join(initial_timeout):
pending = self._queue.qsize()
logger.debug("%d event(s) pending on flush", pending)
if callback is not None:
callback(pending, timeout)
self._queue.join(timeout - initial_timeout)

def submit(self, callback):
# type: (Callable[[], None]) -> None
self._ensure_greenlet()
try:
self._queue.put_nowait(callback)
except Full:
logger.debug("gevent worker queue full, dropping event")


def _task_loop(queue, kill_event):
# type: (JoinableQueue, Event) -> None
while True:
try:
callback = queue.get(timeout=0.1)
if kill_event.is_set():
# NOTE: We want to kill before executing the task, but we also need
# to be able to kill on an empty queue, so we raise Empty to
# avoid code duplciation
raise Empty
try:
callback()
except Exception:
logger.error("Failed processing job", exc_info=True)
finally:
queue.task_done()
except Empty:
pass

if kill_event.is_set():
logger.debug("gevent worker killed")
break
gevent.sleep(0)
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def get_file_text(file_name):
with open(os.path.join(here, file_name)) as in_file:
return in_file.read()


setup(
name="sentry-sdk",
version="0.19.4",
Expand Down Expand Up @@ -53,6 +53,7 @@ def get_file_text(file_name):
"pyspark": ["pyspark>=2.4.4"],
"pure_eval": ["pure_eval", "executing", "asttokens"],
"chalice": ["chalice>=1.16.0"],
"gevent": ["gevent>=1.4.0"],
},
classifiers=[
"Development Status :: 5 - Production/Stable",
Expand Down
9 changes: 8 additions & 1 deletion tests/test_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from sentry_sdk import Hub, Client, add_breadcrumb, capture_message
from sentry_sdk.transport import _parse_rate_limits
from sentry_sdk.integrations.logging import LoggingIntegration
from sentry_sdk.worker.gevent_worker import GeventWorker


@pytest.fixture
Expand All @@ -25,6 +26,7 @@ def inner(**kwargs):
@pytest.mark.parametrize("debug", (True, False))
@pytest.mark.parametrize("client_flush_method", ["close", "flush"])
@pytest.mark.parametrize("use_pickle", (True, False))
@pytest.mark.parametrize("worker", (None, GeventWorker))
def test_transport_works(
httpserver,
request,
Expand All @@ -34,11 +36,16 @@ def test_transport_works(
make_client,
client_flush_method,
use_pickle,
worker,
maybe_monkeypatched_threading,
):
if worker == GeventWorker:
if maybe_monkeypatched_threading != "gevent":
pytest.skip("GeventWorker requires gevent.monkey")

httpserver.serve_content("ok", 200)
caplog.set_level(logging.DEBUG)
client = make_client(debug=debug)
client = make_client(debug=debug, worker=worker)

if use_pickle:
client = pickle.loads(pickle.dumps(client))
Expand Down
Empty file added tests/worker/__init__.py
Empty file.
Loading