Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A3s 843 update pymongo #248

Draft
wants to merge 53 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
5c8c401
add support to delay a job and enqueue it N seconds later
orlandobcrra Mar 24, 2019
1e75c7a
getting latest job with certain status
kevteg Apr 22, 2019
211ac8f
Moving method to job methods
kevteg Apr 23, 2019
4e8696d
updating name and version
kevteg Apr 23, 2019
b7eb6c9
0.9.12 version
kevteg Apr 23, 2019
556c40e
using query instead of status and path
kevteg Apr 23, 2019
5074b4c
042419IC Add support to delete success or cancel jobs that are alread…
icardenas Apr 24, 2019
cd1a56d
042419IC draft test for check test_delete_expired_jobs
icardenas Apr 24, 2019
91001d9
Update Version 0.9.13
icardenas Apr 24, 2019
6c17383
Update Version 0.9.14
icardenas Apr 24, 2019
1377520
Delete TODO word
icardenas Apr 25, 2019
78a1e87
Merge pull request #2 from apploitech/042419IC_add_remove_expires_jobs
icardenas Apr 25, 2019
c038608
042419IC verify version of python to iterate dictionary
icardenas Apr 25, 2019
7beb029
042419IC Update version
icardenas Apr 25, 2019
5b36b23
Update utils.py
icardenas Apr 26, 2019
42957f3
update version
icardenas Apr 26, 2019
7d51f8f
042419IC Fix and update version
icardenas Apr 26, 2019
87fe03b
Merge pull request #3 from apploitech/042419IC_fix_dictionary_iterati…
icardenas Apr 26, 2019
4eca72b
Add status maxretries in endpoint '/api/datatables/taskexceptions'
icardenas Apr 26, 2019
721ba39
Add status maxretries in endpoint '/api/datatables/taskexceptions'
icardenas Apr 26, 2019
e9fe738
Add the status field in the search for exceptions
icardenas Apr 28, 2019
e76e1a0
042619IC Update Version
icardenas Apr 28, 2019
fc95b0a
minor fix bundle
icardenas Apr 28, 2019
5d1bffc
Merge pull request #4 from apploitech/042619IC_add_maxretries_in_task…
icardenas Apr 28, 2019
74d913f
Adding get latest jobs based on a query
Darking360 May 3, 2019
45bc6c7
Adding sort
Darking360 May 6, 2019
6c85adf
Increasing MRQ lib version
Darking360 May 6, 2019
5ca8555
Merge pull request #5 from apploitech/050319MR_add_latest_jobs_query
Darking360 May 6, 2019
63fa50a
Add Redis SSL support
ghabrielv Jan 6, 2021
634c217
Increasing MRQ lib version
ghabrielv Jan 6, 2021
6fdfd29
Fix pypy portable url
ghabrielv Jan 6, 2021
5076764
Update dockerfile
ghabrielv Jan 6, 2021
ff2a3a4
Update dockerfile
ghabrielv Jan 6, 2021
ddda42a
Update library psutil
ghabrielv Jan 6, 2021
adc365e
Update requirements and dockerfile
ghabrielv Jan 6, 2021
964a14b
Specific log description content type
ghabrielv Jan 8, 2021
dd8cb5f
Version 0.9.23 and redis password
ghabrielv Jan 8, 2021
78485ac
Merge pull request #6 from apploitech/redis-ssl
ghabrielv Mar 1, 2021
aa98be8
Sentry cleanup (#7)
ghabrielv Mar 3, 2021
c902409
Upgrade psutil to 5.8.0 (#8)
ghabrielv May 19, 2021
e6d05e9
A3S-1593 - if "retry_current_job" is call outside of an MRQ context, …
orlandobcrra Jul 20, 2022
3389b17
A3S-2518 Remove print line and document the process of generating a n…
ghabrielv Feb 15, 2023
bd6ba3d
Update processes.py
orlandobcrra Apr 4, 2023
37ee776
A3S-2202: version upgrade
Apr 4, 2023
b8ecdf4
A3S-2753: Updated redis to 4.5.5 version
ghabrielv May 9, 2023
8b19f1e
A3S-2749: Settled MRQ version to 0.9.32
ghabrielv May 9, 2023
4143816
Merge pull request #13 from apploitech/A3S-2753
Sarahoyos May 11, 2023
adf6ec9
A3S-2753 Fixed redis SSL connection (#14)
ghabrielv May 15, 2023
c1d55de
A3S-2753: Fixed redis SSL connection (#15)
ghabrielv May 17, 2023
4f68e94
A3S-843: update pymongo to support python 3.11
May 30, 2023
e630216
Merge pull request #16 from apploitech/a3s-843_update_pymongo
jefarr-apploi Jun 7, 2023
a8c85bd
a3s-843: revert pymongo verson to 3.13
Jun 13, 2023
731483b
Merge branch 'master' into a3s-843_update_pymongo
Jun 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM debian:jessie
FROM debian:buster-slim

#
# httpredir.debian.org is often unreliable
Expand All @@ -11,32 +11,36 @@ FROM debian:jessie
# deb http://security.debian.org jessie/updates main\n' \
# > /etc/apt/sources.list

RUN apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 0C49F3730359A14518585931BC711F9BA15703C6
RUN echo "deb http://repo.mongodb.org/apt/debian jessie/mongodb-org/3.4 main" > /etc/apt/sources.list.d/mongodb-org-3.4.list
RUN apt-get update && \
apt-get install -y --no-install-recommends \
curl \
gcc \
python-dev \
python-pip \
python-setuptools \
python3-pip \
python3-dev \
python3-setuptools \
make \
git \
vim \
bzip2 \
mongodb-org \
nginx redis-server \
g++ \
apt-utils \
&& \
apt-get clean -y && \
rm -rf /var/lib/apt/lists/*

RUN curl -sL https://deb.nodesource.com/setup_7.x | bash -
RUN curl -sL https://deb.nodesource.com/setup_12.x | bash -
RUN apt-get install -y --no-install-recommends nodejs

RUN curl -sL https://www.mongodb.org/static/pgp/server-4.4.asc | apt-key add -
RUN echo "deb http://repo.mongodb.org/apt/debian buster/mongodb-org/4.4 main" > /etc/apt/sources.list.d/mongodb-org-4.4.list
RUN apt-get update && apt-get install -y mongodb-org

# Download pypy
RUN curl -sL 'https://bitbucket.org/squeaky/portable-pypy/downloads/pypy-5.8-1-linux_x86_64-portable.tar.bz2' > /pypy.tar.bz2 && tar jxvf /pypy.tar.bz2 && rm -rf /pypy.tar.bz2 && mv /pypy-* /pypy
RUN curl -sL 'https://github.com/squeaky-pl/portable-pypy/releases/download/pypy-7.2.0/pypy-7.2.0-linux_x86_64-portable.tar.bz2' > /pypy.tar.bz2 && tar jxvf /pypy.tar.bz2 && rm -rf /pypy.tar.bz2 && mv /pypy* /pypy

# Upgrade pip
RUN pip install --upgrade --ignore-installed pip
Expand All @@ -48,16 +52,16 @@ ADD requirements-base.txt /app/requirements-base.txt
ADD requirements-dev.txt /app/requirements-dev.txt
ADD requirements-dashboard.txt /app/requirements-dashboard.txt

RUN pip3 install -r /app/requirements-heroku.txt && \
pip3 install -r /app/requirements-base.txt && \
pip3 install -r /app/requirements-dev.txt && \
pip3 install -r /app/requirements-dashboard.txt && \
RUN python3 -m pip install -r /app/requirements-heroku.txt && \
python3 -m pip install -r /app/requirements-base.txt && \
python3 -m pip install -r /app/requirements-dev.txt && \
python3 -m pip install -r /app/requirements-dashboard.txt && \
rm -rf ~/.cache

RUN pip install -r /app/requirements-heroku.txt && \
pip install -r /app/requirements-base.txt && \
pip install -r /app/requirements-dev.txt && \
pip install -r /app/requirements-dashboard.txt && \
RUN python -m pip install -r /app/requirements-heroku.txt && \
python -m pip install -r /app/requirements-base.txt && \
python -m pip install -r /app/requirements-dev.txt && \
python -m pip install -r /app/requirements-dashboard.txt && \
rm -rf ~/.cache

RUN /pypy/bin/pip install -r /app/requirements-heroku.txt && \
Expand Down
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,17 @@ This was a preview on the very basic features of MRQ. What makes it actually use
These features will be demonstrated in a future example of a simple web crawler.


## How to compile a new version of MRQ
- Go to mrq/version.py file and update from 0.9.28.1 to 0.9.29 (for example)
- Commit and pushes the last changes
- Go to in the master branch (after merge pull request)
- Execute the command `python setup.py sdist`
- Make sure you have installed the python twine package (`pip install twine`).
- Execute the twine command to upload the new version of mrq to pypi.org (`twine upload dist/mrq-custom-0.9.29.tar.gz`)
- In the previous step you will be asked for the pypi.org credentials are in 1password, you must request them in #password_requests
- Update all applications that use MRQ with this version in their requirements.txt


# More

Full documentation is available on [readthedocs](http://mrq.readthedocs.org/en/latest/)
7 changes: 7 additions & 0 deletions docs/jobs-maintenance.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ To do that, you should add these recurring scheduled jobs to your mrq-config.py:
```
SCHEDULER_TASKS = [

# This will queue jobs in the 'delayed' status.
{
"path": "mrq.basetasks.cleaning.QueueDelayedJobs",
"params": {},
"interval": 60
},

# This will requeue jobs in the 'retry' status, until they reach their max_retries.
{
"path": "mrq.basetasks.cleaning.RequeueRetryJobs",
Expand Down
8 changes: 7 additions & 1 deletion docs/jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ A **Job** is an instance of the execution of a Task. It must link to a specific

MRQ defines a list of statuses for jobs. A job can only be in one of them at a time.

When everything goes fine, a job will go through 3 statuses:
When everything goes fine, a job will go through 4 statuses:

* ```delayed```: The Job has been created and it is delayed to be queued later.
* ```queued```: The Job has been created and it is waiting to be dequeued by a Worker.
* ```started```: A Worker has dequeued the job and started executing it.
* ```success```: The job was successfully ran.
Expand Down Expand Up @@ -51,6 +52,11 @@ Queues a job. If `queue` is not provided, the default queue for that Task as def

Queues multiple jobs at once. Returns a list of IDs of the jobs.

* `queue_job(main_task_path, params, delay=120, queue=None)`

Create a job with `delayed` status, the job will be queued after of at least `delay` seconds.
Remember to add the base delayed job as explained in [Jobs maintenance](jobs-maintenance.md) to have `delayed` jobs actually queued.

* `queue_raw_jobs(queue, params_list, batch_size=1000)`

Queues multiple jobs at once on a [raw queue](queues.md#raw-queues). The queued jobs have no IDs on a raw queue so this function has no return.
Expand Down
6 changes: 5 additions & 1 deletion mrq/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
import traceback
from collections import defaultdict
from bson import ObjectId
from redis.lock import LuaLock
try:
from redis.lock import LuaLock
except ImportError:
# Change name to avoid NameError raised when use of LuaLock at line 147
from redis.lock import Lock as LuaLock
from .processes import Process, ProcessPool
from .utils import MovingETA, normalize_command
from .queue import Queue
Expand Down
29 changes: 28 additions & 1 deletion mrq/basetasks/cleaning.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,41 @@ class RequeueRetryJobs(Task):
max_concurrency = 1

def run(self, params):
print("IN")
return run_task("mrq.basetasks.utils.JobAction", {
"status": "retry",
"dateretry": {"$lte": datetime.datetime.utcnow()},
"action": "requeue_retry"
})


class QueueDelayedJobs(Task):

""" Requeue jobs that were marked as delayed. """

max_concurrency = 1

def run(self, params):
return run_task("mrq.basetasks.utils.JobAction", {
"status": "delayed",
"dateretry": {"$lte": datetime.datetime.utcnow()},
"action": "requeue"
})


class DeleteExpiresJobs(Task):

""" Delete jobs that were dateexpires is less than the current date time. """

max_concurrency = 1

def run(self, params):
return run_task("mrq.basetasks.utils.JobAction", {
"status": ["success","cancel"],
"dateexpires": {"$lte": datetime.datetime.utcnow()},
"action": "delete"
})


class RequeueStartedJobs(Task):

""" Requeue jobs that were marked as status=started and never finished.
Expand Down
16 changes: 12 additions & 4 deletions mrq/basetasks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from mrq.utils import group_iter
import datetime
import ujson as json
import sys


def get_task_cfg(taskpath):
Expand Down Expand Up @@ -45,6 +46,7 @@ def build_query(self):
"status",
"worker",
"path",
"dateexpires",
"dateretry",
"exceptiontype"]:
if self.params.get(k):
Expand All @@ -70,7 +72,8 @@ def perform_action(self, action, query, destination_queue):

stats = {
"requeued": 0,
"cancelled": 0
"cancelled": 0,
"deleted": 0
}

if action == "cancel":
Expand Down Expand Up @@ -151,7 +154,12 @@ def perform_action(self, action, query, destination_queue):
self.collection.update({
"_id": {"$in": jobs_by_queue[queue]}
}, {"$set": updates}, multi=True)

set_queues_size({queue: len(jobs) for queue, jobs in jobs_by_queue.iteritems()})

if sys.version_info.major > 2:
set_queues_size({queue: len(jobs) for queue, jobs in jobs_by_queue.items()})
else:
set_queues_size({queue: len(jobs) for queue, jobs in jobs_by_queue.iteritems()})
elif action == 'delete':
amount_delete = self.collection.delete_many(query)
stats["deleted"] = amount_delete.deleted_count
return stats

26 changes: 21 additions & 5 deletions mrq/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from .utils import LazyObject, load_class_by_path
from .config import get_config
from .subpool import subpool_map, subpool_imap
from .exceptions import JobRuntimeError


# This should be MRQ's only Python object shared by all the jobs in the same process
_GLOBAL_CONTEXT = {
Expand Down Expand Up @@ -113,10 +115,14 @@ def get_current_config():
return _GLOBAL_CONTEXT["config"]


def retry_current_job(delay=None, max_retries=None, queue=None):
def retry_current_job(delay=None, max_retries=None, queue=None, exception=None):
current_job = get_current_job()
if current_job:
current_job.retry(delay=delay, max_retries=max_retries, queue=queue)
elif exception:
raise JobRuntimeError("Error during MRQ task execution") from exception
else:
raise JobRuntimeError("Error during MRQ task execution")


def abort_current_job():
Expand All @@ -139,22 +145,32 @@ def versiontuple(v):
if isinstance(config_obj, basestring):

import redis as pyredis
import redis.connection as pyredisconnection

connection_class = pyredisconnection.Connection
urllib.parse.uses_netloc.append('redis')
redis_url = urllib.parse.urlparse(config_obj)

log.info("%s: Connecting to Redis at %s..." %
(attr, redis_url.hostname))

redis_pool = pyredis.BlockingConnectionPool(
redis_params = dict(
host=redis_url.hostname,
port=redis_url.port,
db=int((redis_url.path or "").replace("/", "") or "0"),
password=redis_url.password,
password=redis_url.password if redis_url.password is not None else redis_url.username,
max_connections=int(config.get("redis_max_connections")),
timeout=int(config.get("redis_timeout")),
decode_responses=False
decode_responses=False,
connection_class=connection_class
)

if redis_url.scheme == "rediss":
redis_params["connection_class"] = pyredisconnection.SSLConnection
redis_params["ssl_cert_reqs"] = "none"

redis_pool = pyredis.BlockingConnectionPool(**redis_params)

return pyredis.StrictRedis(connection_pool=redis_pool)

# Let's just assume we got a StrictRedis-like object!
Expand Down
4 changes: 2 additions & 2 deletions mrq/dashboard/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ def root():
@requires_auth
def api_task_exceptions():
stats = list(connections.mongodb_jobs.mrq_jobs.aggregate([
{"$match": {"status": "failed"}},
{"$group": {"_id": {"path": "$path", "exceptiontype": "$exceptiontype"},
{"$match": {"status": {"$in": ["failed","maxretries"]}}},
{"$group": {"_id": {"path": "$path", "exceptiontype": "$exceptiontype","status":"$status"},
"jobs": {"$sum": 1}}},
]))

Expand Down
2 changes: 1 addition & 1 deletion mrq/dashboard/static/bin/0.bundle.js

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions mrq/dashboard/static/js/views/jobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,9 @@ define(["jquery", "underscore", "views/generic/datatablepage", "models"],functio
if (source.time) {
display.push("cputime "+String(source.time).substring(0,6)+"s ("+source.switches+" switches)");
}
if (source.status === 'retry' || source.status === 'delayed') {
display.push("requeue "+moment.utc(source.dateretry).fromNow());
}

return "<small>" + display.join("<br/>") + "</small>";

Expand Down
4 changes: 2 additions & 2 deletions mrq/dashboard/static/js/views/taskexceptions.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ define(["jquery", "underscore", "views/generic/datatablepage", "models"],functio
"sClass": "col-exception",
"sType":"numeric",
"mData":function(source, type, val) {
return "<a href='/#jobs?path="+source._id.path+"&status=failed&exceptiontype="+source._id.exceptiontype+ "'>"+source._id.exceptiontype+"</a>"
return "<a href='/#jobs?path=" + source._id.path + "&status=" + source._id.status +"&exceptiontype="+source._id.exceptiontype+ "'>"+source._id.exceptiontype+"</a>"
}
},
{
Expand All @@ -43,7 +43,7 @@ define(["jquery", "underscore", "views/generic/datatablepage", "models"],functio
var cnt = source.jobs || 0;

if (type == "display") {
return "<a href='/#jobs?path="+source._id.path+"&status=failed&exceptiontype="+source._id.exceptiontype+"'>"+cnt+"</a>"
return "<a href='/#jobs?path=" + source._id.path + "&status=" + source._id.status +"&exceptiontype="+source._id.exceptiontype+"'>"+cnt+"</a>"
+ "<br/>"
+ '<span class="inlinesparkline" values="'+self.addToCounter("taskexceptions."+source._id.path+" "+source._id.exceptiontype, cnt, 50).join(",")+'"></span>';
} else {
Expand Down
1 change: 1 addition & 0 deletions mrq/dashboard/templates/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ <h4 class="modal-title"></h4>
<select class="form-control input-sm js-datatable-filters-status" id="jobs-form-status">
<option <%= filters.status==""?"selected='selected'":"" %> value="">-statuses-</option>
<% _.each({
"delayed": "delayed",
"queued": "queued",
"started": "started",
"success": "success",
Expand Down
26 changes: 23 additions & 3 deletions mrq/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,33 @@ class TimeoutInterrupt(_MrqInterrupt):


class AbortInterrupt(_MrqInterrupt):
pass
path = None

def _get_exception_name(self):
return "%s" % (
self.path
)


class RetryInterrupt(_MrqInterrupt):
delay = None
queue = None
retry_count = 0
path = None

def _get_exception_name(self):
return "%s #%s: %s seconds, %s queue" % (
self.__class__.__name__, self.retry_count, self.delay, self.queue
self.path, self.retry_count, self.delay, self.queue
)


class MaxRetriesInterrupt(_MrqInterrupt):
pass
path = None

def _get_exception_name(self):
return "%s" % (
self.path
)


class StopRequested(GreenletExit):
Expand All @@ -53,4 +64,13 @@ class JobInterrupt(GreenletExit):


class MaxConcurrencyInterrupt(_MrqInterrupt):
path = None

def _get_exception_name(self):
return "%s" % (
self.path
)


class JobRuntimeError(_MrqInterrupt):
pass
Loading