Skip to content

Commit

Permalink
Merge pull request #51 from pauldmccarthy/bf/pickle-serialise
Browse files Browse the repository at this point in the history
WIP: BF, CI: Fix serialise bug
  • Loading branch information
pauldmccarthy authored Jan 2, 2021
2 parents ce9ca63 + d46c012 commit dddcd81
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 14 deletions.
54 changes: 40 additions & 14 deletions indexed_gzip/indexed_gzip.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ from cpython.buffer cimport (PyObject_GetBuffer,
cimport indexed_gzip.zran as zran

import io
import os
import pickle
import warnings
import tempfile
import contextlib
import threading
import logging
Expand Down Expand Up @@ -734,12 +736,12 @@ cdef class _IndexedGzipFile:

else:
close_file = False
if fileobj.mode != 'wb':
if getattr(fileobj, 'mode', 'wb') != 'wb':
raise ValueError(
'File should be opened write-only binary mode.')
'File should be opened in writeable binary mode.')

try:
fd = fdopen(fileobj.fileno(), 'ab')
fd = fdopen(fileobj.fileno(), 'wb')
ret = zran.zran_export_index(&self.index, fd)
if ret != zran.ZRAN_EXPORT_OK:
raise ZranError('export_index returned error: {}'.format(ret))
Expand Down Expand Up @@ -776,7 +778,7 @@ cdef class _IndexedGzipFile:

else:
close_file = False
if fileobj.mode != 'rb':
if getattr(fileobj, 'mode', 'rb') != 'rb':
raise ValueError(
'File should be opened read-only binary mode.')

Expand Down Expand Up @@ -948,15 +950,28 @@ class IndexedGzipFile(io.BufferedReader):
'with an open file object, or that has been created '
'with drop_handles=False')

# export and serialise the
# index if any index points
# have been created
if fobj.npoints > 0:
index = io.BytesIO()
self.export_index(fileobj=index)
else:
# export and serialise the index if
# any index points have been created.
# The index data is serialised as a
# bytes object.
if fobj.npoints == 0:
index = None

else:
# zran.c:zran_export_index requires a file
# descriptor, so we give it a tempoorary
# file, and then read the bytes into memory.
tmpfile = None
try:
tmpfile = tempfile.NamedTemporaryFile(delete=False)
tmpfile.close()
self.export_index(tmpfile.name)
with open(tmpfile.name, 'rb') as f:
index = f.read()
finally:
if tmpfile is not None:
os.remove(tmpfile.name)

state = {
'filename' : fobj.filename,
'auto_build' : fobj.auto_build,
Expand Down Expand Up @@ -985,9 +1000,20 @@ def unpickle(state):
gzobj = IndexedGzipFile(**state)

if index is not None:
index.seek(0)
gzobj.import_index(fileobj=index)
index.close()
tmpfile = None
try:
# zran.c:zran_import_index requires an
# actual file with a file descriptor,
# so we write the index data out to a
# temp file, and then pass the file in.
tmpfile = tempfile.NamedTemporaryFile(delete=False)
tmpfile.write(index)
tmpfile.close()
gzobj.import_index(tmpfile.name)

finally:
if tmpfile is not None:
os.remove(tmpfile.name)
gzobj.seek(tell)

return gzobj
59 changes: 59 additions & 0 deletions indexed_gzip/tests/ctest_indexed_gzip.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import itertools as it
import functools as ft
import subprocess as sp
import multiprocessing as mp
import copy as cp
import sys
import time
import gzip
Expand Down Expand Up @@ -167,6 +168,7 @@ def test_init_success_cases(concat, drop):
gf2.close()
gf3.close()


def test_create_from_open_handle(testfile, nelems, seed, drop):

f = open(testfile, 'rb')
Expand Down Expand Up @@ -770,8 +772,10 @@ def test_picklable():

gzf = igzip.IndexedGzipFile(fname)
first50MB = gzf.read(1048576 * 50)
gzf.seek(gzf.tell())
pickled = pickle.dumps(gzf)
second50MB = gzf.read(1048576 * 50)
gzf.seek(gzf.tell())

gzf.close()
del gzf
Expand Down Expand Up @@ -799,6 +803,61 @@ def test_picklable():
del gzf


def test_copyable():
fname = 'test.gz'

with tempdir():
data = np.random.randint(1, 1000, (10000, 10000), dtype=np.uint32)
with gzip.open(fname, 'wb') as f:
f.write(data.tobytes())
del f

gzf = igzip.IndexedGzipFile(fname)
gzf_copy = cp.deepcopy(gzf)
first50MB = gzf.read(1048576 * 50)
gzf.seek(gzf.tell())
gzf_copy2 = cp.deepcopy(gzf)
second50MB = gzf.read(1048576 * 50)
gzf.seek(gzf.tell())

gzf.close()
del gzf

assert gzf_copy.tell() == 0
assert gzf_copy2.tell() == 1048576 * 50
assert gzf_copy.read(1048576 * 50) == first50MB
assert gzf_copy2.read(1048576 * 50) == second50MB
gzf_copy2.seek(0)
assert gzf_copy2.read(1048576 * 50) == first50MB
gzf_copy.close()
gzf_copy2.close()
del gzf_copy
del gzf_copy2

with tempdir():
data = np.random.randint(1, 1000, 50000, dtype=np.uint32)
with gzip.open(fname, 'wb') as f:
f.write(data.tobytes())
del f

# if drop_handles=False, no copy
gzf = igzip.IndexedGzipFile(fname, drop_handles=False)

with pytest.raises(pickle.PicklingError):
gzf_copy = cp.deepcopy(gzf)
gzf.close()
del gzf

# If passed an open filehandle, no copy
with open(fname, 'rb') as fobj:
gzf = igzip.IndexedGzipFile(fileobj=fobj)
with pytest.raises(pickle.PicklingError):
gzf_copy = cp.deepcopy(gzf)
gzf.close()
del gzf
del fobj


def _mpfunc(gzf, size, offset):
gzf.seek(offset)
bytes = gzf.read(size)
Expand Down
3 changes: 3 additions & 0 deletions indexed_gzip/tests/test_indexed_gzip.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ def test_size_multiple_of_readbuf():
def test_picklable():
ctest_indexed_gzip.test_picklable()

def test_copyable():
ctest_indexed_gzip.test_copyable()

@pytest.mark.slow_test
def test_multiproc_serialise():
ctest_indexed_gzip.test_multiproc_serialise()
Expand Down

0 comments on commit dddcd81

Please sign in to comment.