Looking at a certain OSS code, there was something that looked good, so let me introduce it.
A certain OSS is Apache Airflow. Among them, airflow / utils / session.py was a good feeling.
First, from session.py
.
import contextlib
from functools import wraps
from airflow import settings
# contextlib.If you specify contextmanager, it will automatically close using with.
@contextlib.contextmanager
def create_session():
"""
Contextmanager that will create and teardown a session.
"""
session = settings.Session()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
#I want to use Session It complements Session nicely when used in a function
def provide_session(func):
"""
Function decorator that provides a session if it isn't provided.
If you want to reuse a session or run the function as part of a
database transaction, you pass it to the function, if not this wrapper
will create one and close it for you.
"""
@wraps(func)
def wrapper(*args, **kwargs):
arg_session = 'session'
func_params = func.__code__.co_varnames
session_in_args = arg_session in func_params and \
func_params.index(arg_session) < len(args)
session_in_kwargs = arg_session in kwargs
#If there is a session in the argument of function, use it, if not, create it
if session_in_kwargs or session_in_args:
return func(*args, **kwargs)
else:
with create_session() as session:
kwargs[arg_session] = session
return func(*args, **kwargs)
return wrapper
So, the code on the side to use next. Line 940 in airflow / models / baseoperator.py /airflow/blob/master/airflow/models/baseoperator.py#L940)
@provide_session
def get_task_instances(self, start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
session: Session = None) -> List[TaskInstance]:
"""
Get a set of task instance related to this task for a specific date
range.
"""
end_date = end_date or timezone.utcnow()
return session.query(TaskInstance)\
.filter(TaskInstance.dag_id == self.dag_id)\
.filter(TaskInstance.task_id == self.task_id)\
.filter(TaskInstance.execution_date >= start_date)\
.filter(TaskInstance.execution_date <= end_date)\
.order_by(TaskInstance.execution_date)\
.all()
If you specify it as a decorator with a function that uses session
like
If session
is not set in the caller, it will be newly created (and will be closed at the end).
If you have set session
, you can achieve a nice feeling of using it as it is.
I think this method can be applied in various ways other than DB connection.
Recommended Posts