Skip to content

Commit

Permalink
Added Disk and MemoryRevolver classes; Added DiskStorage class; Added…
Browse files Browse the repository at this point in the history
… Disk and MemoryRevolver test cases to test_storage.py
  • Loading branch information
aluamorim committed Mar 11, 2021
1 parent dfd8cdc commit ee57651
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 26 deletions.
106 changes: 82 additions & 24 deletions pyrevolve/pyrevolve.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from abc import ABCMeta, abstractproperty, abstractmethod

import numpy as np


from . import crevolve as cr
from .compression import init_compression as init
from .schedulers import Revolve, Action
from .profiling import Profiler
from .storage import NumpyStorage, BytesStorage
from .storage import NumpyStorage, BytesStorage, DiskStorage


class Operator(object):
Expand Down Expand Up @@ -45,11 +43,10 @@ def nbytes(self):
return self.size * np.dtype(self.dtype).itemsize


class Revolver(object):
class BaseRevolver(object):
"""
This should be the only user-facing class in here. It manages the
interaction between the operators passed by the user, and the data storage.
TODO:
* Reverse operator is always called for a single step. Change this.
* Avoid redundant data stores if higher-order stencils save multiple
Expand All @@ -59,10 +56,11 @@ class Revolver(object):
* Find a better name than `checkpoint`, as the object with that name
stores live data rather than one of the checkpoints.
"""
__metaclass__ = ABCMeta

def __init__(self, checkpoint, fwd_operator, rev_operator,
n_checkpoints=None, n_timesteps=None, timings=None,
compression_params=None):
profiler=None):
"""Initialise checkpointer for a given forward- and reverse operator, a
given number of time steps, and a given storage strategy. The number of
time steps must currently be provided explicitly, and the storage must
Expand All @@ -71,32 +69,25 @@ def __init__(self, checkpoint, fwd_operator, rev_operator,
raise Exception("Online checkpointing not yet supported. Specify \
number of time steps!")
if(n_checkpoints is None):
n_checkpoints = cr.adjust(n_timesteps)
if compression_params is None:
compression_params = {'scheme': None}
self.n_checkpoints = cr.adjust(n_timesteps)
else:
self.n_checkpoints = n_checkpoints

if profiler is None:
self.profiler = Profiler()
else:
self.profiler = profiler

self.timings = timings
self.fwd_operator = fwd_operator
self.rev_operator = rev_operator
self.checkpoint = checkpoint
compressor, decompressor = init(compression_params)
self.profiler = Profiler()

if compression_params['scheme'] is None:
self.storage = NumpyStorage(checkpoint.size, n_checkpoints,
checkpoint.dtype,
profiler=self.profiler)
else:
self.storage = BytesStorage(checkpoint.nbytes, n_checkpoints,
checkpoint.dtype, auto_pickle=True,
compression=(compressor, decompressor))
self.n_timesteps = n_timesteps

self.scheduler = Revolve(n_checkpoints, n_timesteps)
self.scheduler = Revolve(self.n_checkpoints, n_timesteps)

def apply_forward(self):
"""Executes only the forward computation while storing checkpoints,
then returns."""

while(True):
# ask Revolve what to do next.
action = self.scheduler.next()
Expand Down Expand Up @@ -127,7 +118,6 @@ def apply_reverse(self):
with self.profiler.get_timer('reverse', 'reverse'):
self.rev_operator.apply(t_start=self.scheduler.capo,
t_end=self.scheduler.capo+1)

while(True):
# ask Revolve what to do next.
action = self.scheduler.next()
Expand Down Expand Up @@ -163,3 +153,71 @@ def save_checkpoint(self):
def load_checkpoint(self):
locations = self.checkpoint.get_data_location(self.scheduler.capo)
self.storage.load(self.scheduler.cp_pointer, locations)

@abstractmethod
def createStorage(self):
return NotImplemented


class DiskRevolver(BaseRevolver):
"""
This class is an specialization of the Revolver class
that uses a DiskStorage as its default storage method.
When no 'filename' is provided, the storage
.dat file is created inside the working directory.
The storage file is removed by default when storage
object is destroyed. DiskRevolver uses a single
checkpoint file by default.
"""
def __init__(self, checkpoint, fwd_operator, rev_operator,
n_checkpoints=None, n_timesteps=None, timings=None,
filedir="./", singlefile=True):
super().__init__(checkpoint, fwd_operator, rev_operator, n_checkpoints,
n_timesteps, timings)
self.filedir = filedir
self.singlefile = singlefile
self.createStorage()

def createStorage(self):
self.storage = DiskStorage(self.checkpoint.size, self.n_checkpoints,
self.checkpoint.dtype, self.profiler,
filedir=self.filedir,
singlefile=self.singlefile)


class MemoryRevolver(BaseRevolver):
"""
This class is an specialization of the Revolver class
that uses a NumpyStorage as its default storage method.
"""
def __init__(self, checkpoint, fwd_operator, rev_operator,
n_checkpoints=None, n_timesteps=None, timings=None,
compression_params=None):
super().__init__(checkpoint,
fwd_operator,
rev_operator,
n_checkpoints,
n_timesteps,
timings)

if compression_params is None:
compression_params = {'scheme': None}

self.compression_params = compression_params
self.createStorage()

def createStorage(self):
if self.compression_params['scheme'] is None:
self.storage = NumpyStorage(self.checkpoint.size, self.n_checkpoints,
self.checkpoint.dtype,
profiler=self.profiler)
else:
compressor, decompressor = init(self.compression_params)
self.storage = BytesStorage(self.checkpoint.nbytes, self.n_checkpoints,
self.checkpoint.dtype, auto_pickle=True,
compression=(compressor, decompressor))


""" To keep backward compatibility with previous testcases
and all previous codes that use the name Revolver """
Revolver = MemoryRevolver
115 changes: 115 additions & 0 deletions pyrevolve/storage.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,125 @@
import numpy as np
import datetime
from functools import reduce
from operator import mul


from .logger import logger
from .compression import CompressedObject
import pickle
import os
import shutil


class DiskStorage(object):
"""
Stores all checkpoints on one or multiple .dat binary filess,
depending of 'singlefile' flag. The []-operator is overloaded to
return a file object with its file-pointer placed according to
the checkpoint key.
Revolve will typically use this as LIFO, but the storage also supports
random access. By default, the .dat file is removed when the object
is destroyed.
"""

"""
Requires number of checkpoints andsize of one checkpoint.
'filedir': base directory where a dat/ folder is created.
All .dat binary files are stored into 'fildir/dat/' folder.
'singlefile': lets the user decide whether to use one or
multiple files to store checkpoints.
"""
def __init__(self, size_ckp, n_ckp, dtype, profiler, filedir="./",
singlefile=True):
self.size_ckp = size_ckp
self.n_ckp = n_ckp
self.dtype = dtype
self.profiler = profiler
self.singlefile = singlefile
if filedir is None:
self.filedir = "./dat/"
else:
self.filedir = filedir+"/dat/"

if not os.path.exists(self.filedir):
os.makedirs(self.filedir)

''' create unique file names'''
self.filename = self.filedir + "CKP_D{}_PID{}.dat".format(
datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
os.getpid())

self.storage_size = 0
self.shapes = {}
if self.singlefile is True:
self.storage_w = open(self.filename, 'bw+')
self.storage_r = open(self.filename, 'br+')
self.default_storage = self.storage_w

""" Removes .dat file by default """
def __del__(self):
shutil.rmtree(self.filedir, ignore_errors=True)

def setW(self):
''' Sets WRITE stream as default '''
self.default_storage = self.storage_w

def setR(self):
''' Sets READ stream as default '''
self.default_storage = self.storage_r

"""Returns a pointer to the contiguous chunk of memory reserved for the
checkpoint with number `key`."""
def __getitem__(self, key):
assert(key < self.n_ckp)
noffset = key*self.size_ckp
if self.storage_size > 0:
assert(noffset <= self.storage_size)
foffset = noffset*(np.dtype(self.dtype).itemsize)
"""Moves the file-pointer to position determined
by the 'key' parameter"""
self.default_storage.seek(foffset, os.SEEK_SET)
return self.default_storage

def save(self, key, data_pointers):
with self.profiler.get_timer('storage', 'copy_save'):
shapes = []
if self.singlefile is True:
self.setW()
slot = self[key]
else:
ckpfile = self.filename + (".k%d" % (key))
slot = open(ckpfile, 'bw+')

for ptr in data_pointers:
assert(ptr.strides[-1] == ptr.itemsize)
with self.profiler.get_timer('storage', 'flatten'):
data = ptr.ravel()
data.tofile(slot)
slot.flush()
self.storage_size += self.size_ckp
shapes.append(ptr.shape)
self.shapes[key] = shapes
if self.singlefile is False:
slot.close()

def load(self, key, locations):
with self.profiler.get_timer('storage', 'copy_load'):
if self.singlefile is True:
self.setR()
slot = self[key]
else:
ckpfile = self.filename + (".k%d" % (key))
slot = open(ckpfile, 'br+')

offset = 0
for shape, ptr in zip(self.shapes[key], locations):
size = reduce(mul, ptr.shape)
ckp = np.fromfile(slot, dtype=self.dtype, count=size)
np.copyto(ptr, ckp.reshape(ptr.shape))
offset += size
if self.singlefile is False:
slot.close()


class NumpyStorage(object):
Expand Down
Binary file not shown.
71 changes: 71 additions & 0 deletions tests/test_storage.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from pyrevolve.compression import init_compression, compressors_available
from pyrevolve.storage import BytesStorage
from utils import SimpleOperator, SimpleCheckpoint
from utils import IncrementCheckpoint, IncOperator
from pyrevolve import DiskRevolver, MemoryRevolver
import numpy as np
import pytest

Expand All @@ -23,3 +26,71 @@ def test_save_and_restore_with_compression(scheme):

assert(np.allclose(a+i, a1))
assert(np.allclose(b+i, b1))


@pytest.mark.parametrize("nt, ncp", [(10, 2), (10, 4), (10, 6), (10, 8),
(10, 9), (10, 10), (10, 11), (10, 12)])
@pytest.mark.parametrize("singlefile", [True, False])
@pytest.mark.parametrize("diskckp", [True, False])
def test_forward_nt(nt, ncp, singlefile, diskckp):
df = np.zeros([nt, ncp])
db = np.zeros([nt, ncp])
cp = IncrementCheckpoint([df, db])
f = IncOperator(1, df)
b = IncOperator(-1, db)

if diskckp is True:
rev = DiskRevolver(cp, f, b, ncp, nt,
filedir="./", singlefile=singlefile)
else:
rev = MemoryRevolver(cp, f, b, ncp, nt)

assert(f.counter == 0)
rev.apply_forward()
assert(f.counter == nt)


@pytest.mark.parametrize("nt, ncp", [(10, 2), (10, 4), (10, 6), (10, 8),
(10, 9), (10, 10), (10, 11), (10, 12)])
@pytest.mark.parametrize("singlefile", [True, False])
@pytest.mark.parametrize("diskckp", [True, False])
def test_reverse_nt(nt, ncp, singlefile, diskckp):
df = np.zeros([nt, ncp])
db = np.zeros([nt, ncp])
cp = IncrementCheckpoint([df])
f = IncOperator(1, df)
b = IncOperator(-1, df, db)

if diskckp is True:
rev = DiskRevolver(cp, f, b, ncp, nt,
filedir="./", singlefile=singlefile)
else:
rev = MemoryRevolver(cp, f, b, ncp, nt)

rev.apply_forward()
assert(b.counter == 0)
rev.apply_reverse()
assert(b.counter == nt)
assert(np.count_nonzero(db) == 0)


@pytest.mark.parametrize("nt, ncp", [(10, 2), (10, 4), (10, 6), (10, 8),
(10, 9), (10, 10), (10, 11), (10, 12)])
@pytest.mark.parametrize("singlefile", [True, False])
@pytest.mark.parametrize("diskckp", [True, False])
def test_num_loads_and_saves(nt, ncp, singlefile, diskckp):
cp = SimpleCheckpoint()
f = SimpleOperator()
b = SimpleOperator()

if diskckp is True:
rev = DiskRevolver(cp, f, b, ncp, nt,
filedir="./", singlefile=singlefile)
else:
rev = MemoryRevolver(cp, f, b, ncp, nt)

rev.apply_forward()
assert(cp.load_counter == 0)
rev.apply_reverse()

assert(cp.load_counter >= cp.save_counter)
Loading

0 comments on commit ee57651

Please sign in to comment.