diff --git a/corgie/argparsers.py b/corgie/argparsers.py index 9fcf93b..33c5187 100644 --- a/corgie/argparsers.py +++ b/corgie/argparsers.py @@ -71,13 +71,18 @@ def create_layer_from_dict(param_dict, reference=None, caller_name=None, f'must be of type in {allowed_types}') backend = str_to_backend(data_backend) - layer = backend.create_layer(path=layer_path, layer_type=layer_type, - reference=reference, layer_args=layer_args, - **kwargs) - name = params["name"] if name is None: - name = layer.get_layer_type() + name = params['type'] + + layer = backend.create_layer( + path=layer_path, + layer_type=layer_type, + reference=reference, + name=name, + layer_args=layer_args, + **kwargs) + layer.name = name return layer diff --git a/corgie/boundingcube.py b/corgie/boundingcube.py index 840ed7c..b45613b 100644 --- a/corgie/boundingcube.py +++ b/corgie/boundingcube.py @@ -3,6 +3,7 @@ from math import floor, ceil import numpy as np +from corgie import scheduling from corgie.helpers import crop @@ -37,7 +38,7 @@ def get_bcube_from_vertices(vertices, resolution, mip, cant_be_empty=True): return bcube - +@scheduling.serializable class BoundingCube: def __init__(self, xs, xe, ys, ye, zs, ze, mip): self.m0_x = (None, None) diff --git a/corgie/cli/downsample.py b/corgie/cli/downsample.py index a30b262..4c638eb 100644 --- a/corgie/cli/downsample.py +++ b/corgie/cli/downsample.py @@ -144,17 +144,20 @@ def downsample( scheduler = ctx.obj["scheduler"] corgie_logger.debug("Setting up Source and Destination layers...") - src_layer = create_layer_from_spec( - src_layer_spec, caller_name="src layer", readonly=True - ) + if dst_layer_spec is None: corgie_logger.info( "Destination layer not specified. Using Source layer " "as Destination." ) + src_layer = create_layer_from_spec( + src_layer_spec, caller_name="src layer", readonly=False + ) dst_layer = src_layer - dst_layer.readonly = False else: + src_layer = create_layer_from_spec( + src_layer_spec, caller_name="src layer", readonly=True + ) dst_layer = create_layer_from_spec( dst_layer_spec, caller_name="dst_layer layer", @@ -162,6 +165,7 @@ def downsample( reference=src_layer, overwrite=True, ) + bcube = get_bcube_from_coords(start_coord, end_coord, coord_mip) downsample_job = DownsampleJob( src_layer=src_layer, diff --git a/corgie/cli/downsample_by_spec.py b/corgie/cli/downsample_by_spec.py index c871240..e4994a3 100644 --- a/corgie/cli/downsample_by_spec.py +++ b/corgie/cli/downsample_by_spec.py @@ -71,10 +71,6 @@ def downsample_by_spec( scheduler = ctx.obj["scheduler"] corgie_logger.debug("Setting up Source and Destination layers...") - src_layer = create_layer_from_spec( - src_layer_spec, caller_name="src layer", readonly=True - ) - with open(spec_path, "r") as f: spec = set(json.load(f)) @@ -82,9 +78,15 @@ def downsample_by_spec( corgie_logger.info( "Destination layer not specified. Using Source layer " "as Destination." ) + src_layer = create_layer_from_spec( + src_layer_spec, caller_name="src layer", readonly=False + ) + dst_layer = src_layer - dst_layer.readonly = False else: + src_layer = create_layer_from_spec( + src_layer_spec, caller_name="src layer", readonly=True + ) dst_layer = create_layer_from_spec( dst_layer_spec, caller_name="dst_layer layer", @@ -93,6 +95,7 @@ def downsample_by_spec( chunk_z=chunk_z, overwrite=True, ) + bcube = get_bcube_from_coords(start_coord, end_coord, coord_mip) for z in range(*bcube.z_range()): if z in spec: diff --git a/corgie/cli/upsample.py b/corgie/cli/upsample.py index bed0f44..6a8f4ab 100644 --- a/corgie/cli/upsample.py +++ b/corgie/cli/upsample.py @@ -144,17 +144,20 @@ def upsample( scheduler = ctx.obj["scheduler"] corgie_logger.debug("Setting up Source and Destination layers...") - src_layer = create_layer_from_spec( - src_layer_spec, caller_name="src layer", readonly=True - ) if dst_layer_spec is None: corgie_logger.info( "Destination layer not specified. Using Source layer " "as Destination." ) + src_layer = create_layer_from_spec( + src_layer_spec, caller_name="src layer", readonly=False + ) dst_layer = src_layer dst_layer.readonly = False else: + src_layer = create_layer_from_spec( + src_layer_spec, caller_name="src layer", readonly=True + ) dst_layer = create_layer_from_spec( dst_layer_spec, caller_name="dst_layer layer", diff --git a/corgie/data_backends/base.py b/corgie/data_backends/base.py index 2e6fa3b..b60392a 100644 --- a/corgie/data_backends/base.py +++ b/corgie/data_backends/base.py @@ -1,6 +1,6 @@ import torch -from corgie import exceptions +from corgie import exceptions, scheduling from corgie.log import logger as corgie_logger from corgie.layers import get_layer_types, str_to_layer_type @@ -27,6 +27,7 @@ def register_backend_fn(cls): return register_backend_fn +@scheduling.serializable class DataBackendBase: default_device = None diff --git a/corgie/layers/base.py b/corgie/layers/base.py index e944ff0..1145f2d 100644 --- a/corgie/layers/base.py +++ b/corgie/layers/base.py @@ -3,7 +3,7 @@ from corgie import constants, exceptions from corgie import helpers - +from corgie import scheduling STR_TO_LTYPE_DICT = dict() @@ -27,10 +27,9 @@ def str_to_layer_type(s): def get_layer_types(): return list(STR_TO_LTYPE_DICT.keys()) - +@scheduling.serializable class BaseLayerType: def __init__(self, name=None, device='cpu', readonly=False, **kwargs): - super().__init__(**kwargs) self.device = device self.readonly = readonly self.name = name diff --git a/corgie/layers/volumetric_layers.py b/corgie/layers/volumetric_layers.py index b757476..ce3a262 100644 --- a/corgie/layers/volumetric_layers.py +++ b/corgie/layers/volumetric_layers.py @@ -18,8 +18,8 @@ def get_extra_interpolate_parameters(): return {"recompute_scale_factor": False} class VolumetricLayer(BaseLayerType): - def __init__(self, data_mip=None, **kwargs): - super().__init__(**kwargs) + def __init__(self, data_mip=None, *kargs, **kwargs): + super().__init__(*kargs, **kwargs) self.data_mip = data_mip def read(self, bcube, mip, **kwargs): @@ -286,12 +286,12 @@ def get_default_data_type(self): @register_layer_type("fixed_field") class FixedFieldLayer(FieldLayer): """Residuals are specified at a fixed resolution, regardless of MIP. - For example, a field at MIP2 may have residuals specified in MIP0 + For example, a field at MIP2 may have residuals specified in MIP0 pixels. NOTE: It is not recommended to create new fields based on this class. This class was intended to support existing fields created - by legacy code. + by legacy code. """ def __init__(self, *args, fixed_mip=0, **kwargs): super().__init__(*args, **kwargs) @@ -324,7 +324,7 @@ def read(self, bcube, mip, **kwargs): return super().read(bcube, mip, **kwargs) / (2**(mip-self.fixed_mip)) # def write(self, data_tens, bcube, mip, **kwargs): - # super().write(data_tens=data_tens*(2**self.fixed_mip), - # bcube=indexed_bcube, - # mip=mip, + # super().write(data_tens=data_tens*(2**self.fixed_mip), + # bcube=indexed_bcube, + # mip=mip, # **kwargs) diff --git a/corgie/mipless_cloudvolume.py b/corgie/mipless_cloudvolume.py index 7d980c1..95af0f6 100644 --- a/corgie/mipless_cloudvolume.py +++ b/corgie/mipless_cloudvolume.py @@ -7,6 +7,7 @@ from cloudvolume import CloudVolume, Storage from corgie.log import logger as corgie_logger +from corgie import settings def jsonize_key(*kargs, **kwargs): result = '' @@ -119,6 +120,9 @@ def get_info(self): return tmp_cv.info def store_info(self, info=None): + if settings.IS_WORKER: + return + if not self.allow_info_writes: raise Exception("Attempting to store info to {}, but " "'allow_info_writes' flag is set to False".format(self.path)) diff --git a/corgie/scheduling.py b/corgie/scheduling.py index 9df9351..83946e1 100644 --- a/corgie/scheduling.py +++ b/corgie/scheduling.py @@ -1,4 +1,5 @@ import mazepa +import objectscriber as scriber wait_until_done = mazepa.Barrier Task = mazepa.Task @@ -25,3 +26,5 @@ def __init__(self, *kargs, **kwargs): def create_scheduler(*kargs, **kwargs): return Scheduler(*kargs, **kwargs) +def serializable(cls): + return scriber.register_class(cls) diff --git a/corgie/settings.py b/corgie/settings.py new file mode 100644 index 0000000..95dd482 --- /dev/null +++ b/corgie/settings.py @@ -0,0 +1 @@ +IS_WORKER = False diff --git a/corgie/stack.py b/corgie/stack.py index eb9dfb5..913b517 100644 --- a/corgie/stack.py +++ b/corgie/stack.py @@ -10,11 +10,12 @@ import six -from corgie import exceptions, helpers +from corgie import exceptions, helpers, scheduling from corgie.layers import str_to_layer_type from corgie.log import logger as corgie_logger +@scheduling.serializable class StackBase: def __init__(self, name=None): self.name = name @@ -79,6 +80,23 @@ def get_layer_types(self): return list(layer_types) + def serialize(self, serializer): + spec = {} + spec['name'] = self.name + #spec['folder'] = self.folder + spec['layers'] = serializer.serialize(self.layers) + spec['reference_layer'] = serializer.serialize(self.reference_layer) + return spec + + + @classmethod + def deserialize(cls, spec, serializer): + obj = cls() + obj.name = spec['name'] + obj.layers = serializer.deserialize(spec['layers']) + obj.reference_layer = serializer.deserialize(spec['reference_layer']) + return obj + class Stack(StackBase): def __init__(self, name=None, layer_list=[], folder=None, **kwargs): @@ -195,7 +213,6 @@ def read_data_dict( translation.y += src_field_trans.y # if translation.x != 0 or translation.y != 0: - # import pdb; pdb.set_trace() final_bcube = copy.deepcopy(bcube) final_bcube = final_bcube.translate( x_offset=translation.y, y_offset=translation.x, mip=mip @@ -206,9 +223,6 @@ def read_data_dict( data_dict[global_name] = l.read(bcube=final_bcube, mip=mip) return translation, data_dict - def z_range(self): - return self.bcube.z_range() - def cutout(self): raise NotImplementedError diff --git a/corgie/worker.py b/corgie/worker.py index 4b603b7..79ef443 100644 --- a/corgie/worker.py +++ b/corgie/worker.py @@ -3,7 +3,7 @@ from corgie import scheduling from corgie.log import logger as corgie_logger from corgie.log import configure_logger - +from corgie import settings @click.command() @click.option('--lease_seconds', '-l', nargs=1, type=int, required=True) @@ -11,5 +11,6 @@ @click.option('-v', '--verbose', count=True, help='Turn on debug logging') def worker(lease_seconds, verbose, **kwargs): configure_logger(verbose) + settings.IS_WORKER = True executor = scheduling.parse_executor_from_kwargs(kwargs) executor.execute(lease_seconds=lease_seconds) diff --git a/setup.py b/setup.py index dc34b63..1737106 100644 --- a/setup.py +++ b/setup.py @@ -28,6 +28,7 @@ 'click-option-group', 'click>=7,<8', 'procspec', + 'objectscriber', 'idna>=2.5', 'google-auth>=1.11.0', 'cloud-volume',