# SPDX-FileCopyrightText: All Contributors to the PyTango project
# SPDX-License-Identifier: LGPL-3.0-or-later
# Imports
import os
from functools import wraps
from threading import get_ident
# Tango imports
from tango._tango import GreenMode
__all__ = (
"get_green_mode",
"set_green_mode",
"green",
"green_callback",
"get_executor",
"get_object_executor",
"switch_existing_global_executors_to_thread",
)
try:
import gevent
del gevent
_gevent_available = True
except ImportError:
_gevent_available = False
# Handle current green mode
try:
_CURRENT_GREEN_MODE = getattr(
GreenMode, os.environ["PYTANGO_GREEN_MODE"].capitalize()
)
except Exception:
_CURRENT_GREEN_MODE = GreenMode.Synchronous
[docs]
def set_green_mode(green_mode=None):
"""Sets the global default PyTango green mode.
Advice: Use only in your final application. Don't use this in a python
library in order not to interfere with the beavior of other libraries
and/or application where your library is being.
:param green_mode: the new global default PyTango green mode
:type green_mode: GreenMode
"""
global _CURRENT_GREEN_MODE
# Make sure the green mode is available
get_executor(green_mode)
# Set the green mode
_CURRENT_GREEN_MODE = green_mode
[docs]
def get_green_mode():
"""Returns the current global default PyTango green mode.
:returns: the current global default PyTango green mode
:rtype: GreenMode
"""
return _CURRENT_GREEN_MODE
# Abstract executor class
class AbstractExecutor:
asynchronous = NotImplemented
default_wait = NotImplemented
def __init__(self):
self.ident = get_ident(), os.getpid()
def get_ident(self):
return self.ident
def in_executor_context(self):
return self.ident == (get_ident(), os.getpid())
def delegate(self, fn, *args, **kwargs):
"""Delegate an operation and return an accessor."""
if not self.asynchronous:
raise ValueError("Not supported in synchronous mode")
raise NotImplementedError
def access(self, accessor, timeout=None):
"""Return a result from an accessor."""
if not self.asynchronous:
raise ValueError("Not supported in synchronous mode")
raise NotImplementedError
def submit(self, fn, *args, **kwargs):
"""Submit an operation"""
if not self.asynchronous:
return fn(*args, **kwargs)
raise NotImplementedError
def execute(self, fn, *args, **kwargs):
"""Execute an operation and return the result."""
if not self.asynchronous:
return fn(*args, **kwargs)
raise NotImplementedError
def run(self, fn, args=(), kwargs={}, wait=None, timeout=None):
if wait is None:
wait = self.default_wait
# Wait and timeout are not supported in synchronous mode
if not self.asynchronous and (not wait or timeout):
raise ValueError("Not supported in synchronous mode")
# Synchronous (no delegation)
if not self.asynchronous or not self.in_executor_context():
return fn(*args, **kwargs)
# Asynchronous delegation
accessor = self.delegate(fn, *args, **kwargs)
if not wait:
return accessor
return self.access(accessor, timeout=timeout)
class SynchronousExecutor(AbstractExecutor):
asynchronous = False
default_wait = True
# Default synchronous executor
def get_synchronous_executor():
return _SYNCHRONOUS_EXECUTOR
_SYNCHRONOUS_EXECUTOR = SynchronousExecutor()
# Getters
def get_object_green_mode(obj):
if hasattr(obj, "get_green_mode"):
return obj.get_green_mode()
return get_green_mode()
def get_executor(green_mode=None):
if green_mode is None:
green_mode = get_green_mode()
# Valid green modes
if green_mode == GreenMode.Synchronous:
return get_synchronous_executor()
if green_mode == GreenMode.Gevent:
from tango import gevent_executor
return gevent_executor.get_global_executor()
if green_mode == GreenMode.Futures:
from tango import futures_executor
return futures_executor.get_global_executor()
if green_mode == GreenMode.Asyncio:
from tango import asyncio_executor
return asyncio_executor.get_global_executor()
# Invalid green mode
raise TypeError("Not a valid green mode")
def switch_existing_global_executors_to_thread():
"""
checks which global executor existing, and if they are belong to the caller thread
if not - creates a new executor, linked to thread, and set it as global
"""
from tango import asyncio_executor
from tango import futures_executor
if _gevent_available:
from tango import gevent_executor
else:
gevent_executor = None
for executor in [asyncio_executor, futures_executor, gevent_executor]:
if executor:
executor._switch_global_executor_to_thread()
def get_object_executor(obj, green_mode=None):
"""Returns the proper executor for the given object.
If the object has *_executors* and *_green_mode* members it returns
the submit callable for the executor corresponding to the green_mode.
Otherwise it returns the global executor for the given green_mode.
Note: *None* is a valid object.
:returns: submit callable"""
# Get green mode
if green_mode is None:
green_mode = get_object_green_mode(obj)
# Get executor
executor = None
if hasattr(obj, "_executors"):
executor = obj._executors.get(green_mode, None)
if executor is None:
executor = get_executor(green_mode)
# Get submitter
return executor
# Green modifiers
def green(fn=None, consume_green_mode=True):
"""Make a function green. Can be used as a decorator."""
def decorator(fn):
@wraps(fn)
def greener(obj, *args, **kwargs):
args = (obj,) + args
wait = kwargs.pop("wait", None)
timeout = kwargs.pop("timeout", None)
access = kwargs.pop if consume_green_mode else kwargs.get
green_mode = access("green_mode", None)
executor = get_object_executor(obj, green_mode)
return executor.run(fn, args, kwargs, wait=wait, timeout=timeout)
return greener
if fn is None:
return decorator
return decorator(fn)
def green_callback(fn, obj=None, green_mode=None):
"""Return a green verion of the given callback."""
executor = get_object_executor(obj, green_mode)
@wraps(fn)
def greener(*args, **kwargs):
return executor.submit(fn, *args, **kwargs)
return greener