Skip to content

Commit

Permalink
When determining available memory and cores, take into account cgroup…
Browse files Browse the repository at this point in the history
… limits (#905)

When determining available memory and cores, take into account cgroup limits, which are used e.g. by docker run's --cpus and --memory options.  Currently we use memory and cores values for the host machine, not the container.
  • Loading branch information
notestaff authored Dec 10, 2018
1 parent 581d79a commit 3122be6
Show file tree
Hide file tree
Showing 12 changed files with 74 additions and 32 deletions.
45 changes: 45 additions & 0 deletions docker/calc_mem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/usr/bin/env python

"""Calculate the memory allocated to the process, taking account of cgroups.
Print result to stdout.
"""

import argparse
import sys
import os
import os.path

parser = argparse.ArgumentParser('Calculated memory allocated to the process')
parser.add_argument('mem_unit', choices=('mb', 'gb'), help='memory units')
parser.add_argument('mem_fraction', type=int, help='what fraction of total memory to report')
args = parser.parse_args()

if not (1 <= args.mem_fraction <= 100):
raise RuntimeError("mem_fraction should be in the range [1,100]")

unit2factor = {'k': 1024, 'm': 1024*1024, 'g': 1024*1024*1024}

def mem_from_proc_meminfo():
"""Return the total memory, in bytes, as given by /proc/meminfo"""
with open('/proc/meminfo') as f:
for line in f:
if line.startswith('MemTotal:'):
parts = line.strip().split()
val, unit = parts[1:3]
unit_factor = unit2factor[unit[0].lower()]
return int(val) * unit_factor
raise RuntimeError('Could not get MemTotal from /proc/meminfo')

def mem_from_cgroups():
"""Return the total memory, in bytes, as given by cgroups (or sys.maxsize if not given)"""
cgroups_memlimit_fname = '/sys/fs/cgroup/memory/memory.limit_in_bytes'
if os.path.isfile(cgroups_memlimit_fname):
with open(cgroups_memlimit_fname) as f:
val = f.read().strip()
return int(val) * unit2factor.get(val[-1], 1)

return sys.maxsize

mem_in_bytes = min(mem_from_proc_meminfo(), mem_from_cgroups())
mem_in_units = float(mem_in_bytes) / float(unit2factor[args.mem_unit[0]])
print(int(mem_in_units * (float(args.mem_fraction) / 100.0)))
3 changes: 0 additions & 3 deletions docker/mem_in_gb_90.sh

This file was deleted.

3 changes: 0 additions & 3 deletions docker/mem_in_mb_50.sh

This file was deleted.

3 changes: 0 additions & 3 deletions docker/mem_in_mb_80.sh

This file was deleted.

3 changes: 0 additions & 3 deletions docker/mem_in_mb_85.sh

This file was deleted.

3 changes: 0 additions & 3 deletions docker/mem_in_mb_90.sh

This file was deleted.

3 changes: 0 additions & 3 deletions docker/mem_in_mb_95.sh

This file was deleted.

10 changes: 5 additions & 5 deletions pipes/WDL/workflows/tasks/tasks_assembly.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ task assemble {
set -ex -o pipefail

# find 90% memory
mem_in_mb=`/opt/viral-ngs/source/docker/mem_in_mb_90.sh`
mem_in_gb=`/opt/viral-ngs/source/docker/mem_in_gb_90.sh`
mem_in_mb=`/opt/viral-ngs/source/docker/calc_mem.py mb 90`
mem_in_gb=`/opt/viral-ngs/source/docker/calc_mem.py gb 90`

if [[ "${assembler}" == "trinity" ]]; then
assembly.py assemble_trinity \
Expand Down Expand Up @@ -110,7 +110,7 @@ task scaffold {
set -ex -o pipefail

# find 90% memory
mem_in_gb=`/opt/viral-ngs/source/docker/mem_in_gb_90.sh`
mem_in_gb=`/opt/viral-ngs/source/docker/calc_mem.py gb 90`

assembly.py order_and_orient \
${contigs_fasta} \
Expand Down Expand Up @@ -187,7 +187,7 @@ task refine {
set -ex -o pipefail

# find 90% memory
mem_in_mb=`/opt/viral-ngs/source/docker/mem_in_mb_90.sh`
mem_in_mb=`/opt/viral-ngs/source/docker/calc_mem.py mb 90`

# prep GATK
mkdir gatk
Expand Down Expand Up @@ -258,7 +258,7 @@ task refine_2x_and_plot {
set -ex -o pipefail

# find 90% memory
mem_in_mb=`/opt/viral-ngs/source/docker/mem_in_mb_90.sh`
mem_in_mb=`/opt/viral-ngs/source/docker/calc_mem.py mb 90`

# prep GATK
mkdir gatk
Expand Down
4 changes: 2 additions & 2 deletions pipes/WDL/workflows/tasks/tasks_demux.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ task illumina_demux {
set -ex -o pipefail

# find N% memory
mem_in_mb=`/opt/viral-ngs/source/docker/mem_in_mb_85.sh`
mem_in_mb=`/opt/viral-ngs/source/docker/calc_mem.py mb 85`

if [ -d /mnt/tmp ]; then
TMPDIR=/mnt/tmp
Expand Down Expand Up @@ -108,7 +108,7 @@ task illumina_demux {
elif [ "$total_tile_count" -le 896 ]; then
echo "Detected $total_tile_count tiles, interpreting as HiSeq4k run."
elif [ "$total_tile_count" -le 1408 ]; then
mem_in_mb=$(/opt/viral-ngs/source/docker/mem_in_mb_80.sh)
mem_in_mb=$(/opt/viral-ngs/source/docker/calc_mem.py mb 80)
demux_threads=20 # with NovaSeq-size output, OOM errors can sporadically occur with higher thread counts
echo "Detected $total_tile_count tiles, interpreting as NovaSeq run."
echo " **Note: Q20 threshold used since NovaSeq with RTA3 writes only four Q-score values: 2, 12, 23, and 37.**"
Expand Down
2 changes: 1 addition & 1 deletion pipes/WDL/workflows/tasks/tasks_metagenomics.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ task diamond_contigs {
DIAMOND_TAXDB_DIR=$(mktemp -d)

# find 90% memory
mem_in_gb=`/opt/viral-ngs/source/docker/mem_in_gb_90.sh`
mem_in_gb=`/opt/viral-ngs/source/docker/calc_mem.py gb 90`

# decompress DBs to /mnt/db
cat ${diamond_db_lz4} | lz4 -d > $TMPDIR/diamond_db.dmnd &
Expand Down
8 changes: 4 additions & 4 deletions pipes/WDL/workflows/tasks/tasks_taxon_filter.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ task deplete_taxa {
fi

# find memory thresholds
mem_in_mb_50=`/opt/viral-ngs/source/docker/mem_in_mb_50.sh`
mem_in_mb_90=`/opt/viral-ngs/source/docker/mem_in_mb_90.sh`
mem_in_mb_50=`/opt/viral-ngs/source/docker/calc_mem.py mb 50`
mem_in_mb_90=`/opt/viral-ngs/source/docker/calc_mem.py mb 90`

# bmtagger and blast db args
DBS_BMTAGGER="${sep=' ' bmtaggerDbs}"
Expand Down Expand Up @@ -92,7 +92,7 @@ task filter_to_taxon {
set -ex -o pipefail

# find 90% memory
mem_in_mb=`/opt/viral-ngs/source/docker/mem_in_mb_90.sh`
mem_in_mb=`/opt/viral-ngs/source/docker/calc_mem.py mb 90`

taxon_filter.py filter_lastal_bam \
${reads_unmapped_bam} \
Expand Down Expand Up @@ -149,7 +149,7 @@ task merge_one_per_sample {
set -ex -o pipefail

# find 90% memory
mem_in_mb=`/opt/viral-ngs/source/docker/mem_in_mb_90.sh`
mem_in_mb=`/opt/viral-ngs/source/docker/calc_mem.py mb 90`

read_utils.py merge_bams \
"${sep=' ' inputBams}" \
Expand Down
19 changes: 17 additions & 2 deletions util/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,18 +413,33 @@ def available_cpu_count():
Adapted from http://stackoverflow.com/a/1006301/715090
"""

cgroup_cpus = MAX_INT32
try:
def get_cpu_val(name):
return float(util.file.slurp_file('/sys/fs/cgroup/cpu/cpu.'+name).strip())
cfs_quota = get_cpu_val('cfs_quota_us')
cfs_period = get_cpu_val('cfs_quota_us')
log.debug('cfs_quota %s, cfs_period %s', cfs_quota, cfs_period)
cgroup_cpus = max(1, int(cfs_quota / cfs_period))
except Exception:
pass

proc_cpus = MAX_INT32
try:
with open('/proc/self/status') as f:
status = f.read()
m = re.search(r'(?m)^Cpus_allowed:\s*(.*)$', status)
if m:
res = bin(int(m.group(1).replace(',', ''), 16)).count('1')
if res > 0:
return min(res, multiprocessing.cpu_count())
proc_cpus = res
except IOError:
pass

return multiprocessing.cpu_count()
log.debug('cgroup_cpus %d, proc_cpus %d, multiprocessing cpus %d',
cgroup_cpus, proc_cpus, multiprocessing.cpu_count())
return min(cgroup_cpus, proc_cpus, multiprocessing.cpu_count())

def sanitize_thread_count(threads=None, tool_max_cores_value=available_cpu_count):
''' Given a user specified thread count, this function will:
Expand Down

0 comments on commit 3122be6

Please sign in to comment.