diff --git a/docker/calc_mem.py b/docker/calc_mem.py new file mode 100755 index 000000000..c01103eba --- /dev/null +++ b/docker/calc_mem.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python + +"""Calculate the memory allocated to the process, taking account of cgroups. +Print result to stdout. +""" + +import argparse +import sys +import os +import os.path + +parser = argparse.ArgumentParser('Calculated memory allocated to the process') +parser.add_argument('mem_unit', choices=('mb', 'gb'), help='memory units') +parser.add_argument('mem_fraction', type=int, help='what fraction of total memory to report') +args = parser.parse_args() + +if not (1 <= args.mem_fraction <= 100): + raise RuntimeError("mem_fraction should be in the range [1,100]") + +unit2factor = {'k': 1024, 'm': 1024*1024, 'g': 1024*1024*1024} + +def mem_from_proc_meminfo(): + """Return the total memory, in bytes, as given by /proc/meminfo""" + with open('/proc/meminfo') as f: + for line in f: + if line.startswith('MemTotal:'): + parts = line.strip().split() + val, unit = parts[1:3] + unit_factor = unit2factor[unit[0].lower()] + return int(val) * unit_factor + raise RuntimeError('Could not get MemTotal from /proc/meminfo') + +def mem_from_cgroups(): + """Return the total memory, in bytes, as given by cgroups (or sys.maxsize if not given)""" + cgroups_memlimit_fname = '/sys/fs/cgroup/memory/memory.limit_in_bytes' + if os.path.isfile(cgroups_memlimit_fname): + with open(cgroups_memlimit_fname) as f: + val = f.read().strip() + return int(val) * unit2factor.get(val[-1], 1) + + return sys.maxsize + +mem_in_bytes = min(mem_from_proc_meminfo(), mem_from_cgroups()) +mem_in_units = float(mem_in_bytes) / float(unit2factor[args.mem_unit[0]]) +print(int(mem_in_units * (float(args.mem_fraction) / 100.0))) diff --git a/docker/mem_in_gb_90.sh b/docker/mem_in_gb_90.sh deleted file mode 100755 index 08329350d..000000000 --- a/docker/mem_in_gb_90.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash - -head -n1 /proc/meminfo | awk '{print int($2*0.9/1024/1024)}' diff --git a/docker/mem_in_mb_50.sh b/docker/mem_in_mb_50.sh deleted file mode 100755 index 1d4347620..000000000 --- a/docker/mem_in_mb_50.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash - -head -n1 /proc/meminfo | awk '{print int($2*0.5/1024)}' diff --git a/docker/mem_in_mb_80.sh b/docker/mem_in_mb_80.sh deleted file mode 100755 index ae247ad7b..000000000 --- a/docker/mem_in_mb_80.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash - -head -n1 /proc/meminfo | awk '{print int($2*0.80/1024)}' diff --git a/docker/mem_in_mb_85.sh b/docker/mem_in_mb_85.sh deleted file mode 100755 index 126401f8f..000000000 --- a/docker/mem_in_mb_85.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash - -head -n1 /proc/meminfo | awk '{print int($2*0.85/1024)}' diff --git a/docker/mem_in_mb_90.sh b/docker/mem_in_mb_90.sh deleted file mode 100755 index 61da08a82..000000000 --- a/docker/mem_in_mb_90.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash - -head -n1 /proc/meminfo | awk '{print int($2*0.9/1024)}' diff --git a/docker/mem_in_mb_95.sh b/docker/mem_in_mb_95.sh deleted file mode 100755 index 1bd375a00..000000000 --- a/docker/mem_in_mb_95.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash - -head -n1 /proc/meminfo | awk '{print int($2*0.95/1024)}' diff --git a/pipes/WDL/workflows/tasks/tasks_assembly.wdl b/pipes/WDL/workflows/tasks/tasks_assembly.wdl index accf5dcb1..6f7796706 100644 --- a/pipes/WDL/workflows/tasks/tasks_assembly.wdl +++ b/pipes/WDL/workflows/tasks/tasks_assembly.wdl @@ -18,8 +18,8 @@ task assemble { set -ex -o pipefail # find 90% memory - mem_in_mb=`/opt/viral-ngs/source/docker/mem_in_mb_90.sh` - mem_in_gb=`/opt/viral-ngs/source/docker/mem_in_gb_90.sh` + mem_in_mb=`/opt/viral-ngs/source/docker/calc_mem.py mb 90` + mem_in_gb=`/opt/viral-ngs/source/docker/calc_mem.py gb 90` if [[ "${assembler}" == "trinity" ]]; then assembly.py assemble_trinity \ @@ -110,7 +110,7 @@ task scaffold { set -ex -o pipefail # find 90% memory - mem_in_gb=`/opt/viral-ngs/source/docker/mem_in_gb_90.sh` + mem_in_gb=`/opt/viral-ngs/source/docker/calc_mem.py gb 90` assembly.py order_and_orient \ ${contigs_fasta} \ @@ -187,7 +187,7 @@ task refine { set -ex -o pipefail # find 90% memory - mem_in_mb=`/opt/viral-ngs/source/docker/mem_in_mb_90.sh` + mem_in_mb=`/opt/viral-ngs/source/docker/calc_mem.py mb 90` # prep GATK mkdir gatk @@ -258,7 +258,7 @@ task refine_2x_and_plot { set -ex -o pipefail # find 90% memory - mem_in_mb=`/opt/viral-ngs/source/docker/mem_in_mb_90.sh` + mem_in_mb=`/opt/viral-ngs/source/docker/calc_mem.py mb 90` # prep GATK mkdir gatk diff --git a/pipes/WDL/workflows/tasks/tasks_demux.wdl b/pipes/WDL/workflows/tasks/tasks_demux.wdl index f16a27e50..6bac6facd 100644 --- a/pipes/WDL/workflows/tasks/tasks_demux.wdl +++ b/pipes/WDL/workflows/tasks/tasks_demux.wdl @@ -57,7 +57,7 @@ task illumina_demux { set -ex -o pipefail # find N% memory - mem_in_mb=`/opt/viral-ngs/source/docker/mem_in_mb_85.sh` + mem_in_mb=`/opt/viral-ngs/source/docker/calc_mem.py mb 85` if [ -d /mnt/tmp ]; then TMPDIR=/mnt/tmp @@ -108,7 +108,7 @@ task illumina_demux { elif [ "$total_tile_count" -le 896 ]; then echo "Detected $total_tile_count tiles, interpreting as HiSeq4k run." elif [ "$total_tile_count" -le 1408 ]; then - mem_in_mb=$(/opt/viral-ngs/source/docker/mem_in_mb_80.sh) + mem_in_mb=$(/opt/viral-ngs/source/docker/calc_mem.py mb 80) demux_threads=20 # with NovaSeq-size output, OOM errors can sporadically occur with higher thread counts echo "Detected $total_tile_count tiles, interpreting as NovaSeq run." echo " **Note: Q20 threshold used since NovaSeq with RTA3 writes only four Q-score values: 2, 12, 23, and 37.**" diff --git a/pipes/WDL/workflows/tasks/tasks_metagenomics.wdl b/pipes/WDL/workflows/tasks/tasks_metagenomics.wdl index 7fa9bb302..b0385eb8e 100644 --- a/pipes/WDL/workflows/tasks/tasks_metagenomics.wdl +++ b/pipes/WDL/workflows/tasks/tasks_metagenomics.wdl @@ -203,7 +203,7 @@ task diamond_contigs { DIAMOND_TAXDB_DIR=$(mktemp -d) # find 90% memory - mem_in_gb=`/opt/viral-ngs/source/docker/mem_in_gb_90.sh` + mem_in_gb=`/opt/viral-ngs/source/docker/calc_mem.py gb 90` # decompress DBs to /mnt/db cat ${diamond_db_lz4} | lz4 -d > $TMPDIR/diamond_db.dmnd & diff --git a/pipes/WDL/workflows/tasks/tasks_taxon_filter.wdl b/pipes/WDL/workflows/tasks/tasks_taxon_filter.wdl index 0e3b8d252..19f43d2fc 100644 --- a/pipes/WDL/workflows/tasks/tasks_taxon_filter.wdl +++ b/pipes/WDL/workflows/tasks/tasks_taxon_filter.wdl @@ -22,8 +22,8 @@ task deplete_taxa { fi # find memory thresholds - mem_in_mb_50=`/opt/viral-ngs/source/docker/mem_in_mb_50.sh` - mem_in_mb_90=`/opt/viral-ngs/source/docker/mem_in_mb_90.sh` + mem_in_mb_50=`/opt/viral-ngs/source/docker/calc_mem.py mb 50` + mem_in_mb_90=`/opt/viral-ngs/source/docker/calc_mem.py mb 90` # bmtagger and blast db args DBS_BMTAGGER="${sep=' ' bmtaggerDbs}" @@ -92,7 +92,7 @@ task filter_to_taxon { set -ex -o pipefail # find 90% memory - mem_in_mb=`/opt/viral-ngs/source/docker/mem_in_mb_90.sh` + mem_in_mb=`/opt/viral-ngs/source/docker/calc_mem.py mb 90` taxon_filter.py filter_lastal_bam \ ${reads_unmapped_bam} \ @@ -149,7 +149,7 @@ task merge_one_per_sample { set -ex -o pipefail # find 90% memory - mem_in_mb=`/opt/viral-ngs/source/docker/mem_in_mb_90.sh` + mem_in_mb=`/opt/viral-ngs/source/docker/calc_mem.py mb 90` read_utils.py merge_bams \ "${sep=' ' inputBams}" \ diff --git a/util/misc.py b/util/misc.py index 9a9ba89d3..731f6c64f 100644 --- a/util/misc.py +++ b/util/misc.py @@ -413,6 +413,19 @@ def available_cpu_count(): Adapted from http://stackoverflow.com/a/1006301/715090 """ + + cgroup_cpus = MAX_INT32 + try: + def get_cpu_val(name): + return float(util.file.slurp_file('/sys/fs/cgroup/cpu/cpu.'+name).strip()) + cfs_quota = get_cpu_val('cfs_quota_us') + cfs_period = get_cpu_val('cfs_quota_us') + log.debug('cfs_quota %s, cfs_period %s', cfs_quota, cfs_period) + cgroup_cpus = max(1, int(cfs_quota / cfs_period)) + except Exception: + pass + + proc_cpus = MAX_INT32 try: with open('/proc/self/status') as f: status = f.read() @@ -420,11 +433,13 @@ def available_cpu_count(): if m: res = bin(int(m.group(1).replace(',', ''), 16)).count('1') if res > 0: - return min(res, multiprocessing.cpu_count()) + proc_cpus = res except IOError: pass - return multiprocessing.cpu_count() + log.debug('cgroup_cpus %d, proc_cpus %d, multiprocessing cpus %d', + cgroup_cpus, proc_cpus, multiprocessing.cpu_count()) + return min(cgroup_cpus, proc_cpus, multiprocessing.cpu_count()) def sanitize_thread_count(threads=None, tool_max_cores_value=available_cpu_count): ''' Given a user specified thread count, this function will: