Aug 17, 2022·Python

Scheduled Tasks in Django with APScheduler

Hola muy buenas noches a todos, espero que esten bien. Yo me encuentro bien, más ahora que termine de ver Better Call Saul.

Me terminé por decidir por y su relativa en Python porque es el que encontré que posee una mejorar comunidad que mantiene el repositorio y honestamente esto se ve reflejado en la facilidad de adopción.

Voy a describir los pasos que realicé brevemente:

En primer lugar hay que instalar las dependencias.

pip install django-apscheduler
pip install apscheduler

Recuerden que si están en un entorno virtual hay que reemplazar pip por pipenv

Luego de esto debemos agregar a nuestro settings.py la correspondiente dependencia, django_apscheduler.

INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'corsheaders',
'rest_framework',
'django_seed',
'django_apscheduler',
'newapp',
]

Y también dentro de ese mismo archivo debemos agregar las siguientes variables:

APSCHEDULER_DATETIME_FORMAT = "N j, Y, f:s a" # Default
APSCHEDULER_RUN_NOW_TIMEOUT = 25 # Seconds
SCHEDULER_DEFAULT = True

Ahora habrá que crear un archivo que se puede llamar por ejemplo, operator.py (que deberá ser creado dentro de la carpeta de nuestra app creada) cuyo contenido sería algo como esto:

from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import register_events, DjangoJobStore
from newapp.views import send_hello
from django_apscheduler.jobstores import DjangoJobStore
from django_apscheduler import util
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ProcessPoolExecutor
executors = {
'default': {'type': 'threadpool', 'max_workers': 20},
'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
def auto_hello():
send_hello()
@util.close_old_connections
def start():
scheduler = BackgroundScheduler(standalone=True)
scheduler.add_jobstore(DjangoJobStore(), "default")
scheduler.start()
scheduler.add_job(auto_hello, 'interval', seconds=1)

Ahora lo que debemos hacer es añadir una función a nuestra app creada, esto lo que va a hacer es iniciar el scheduler definido anteriormente:

from django.apps import AppConfig
from django.conf import settings
class NewAppConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'newapp'
def ready(self):
if settings.SCHEDULER_DEFAULT:
from newapp import operator
operator.start()