Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion splitgraph/commandline/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,9 @@ def upload_c(remote, file_format, repository, files):

wait_for_load(client, repository.namespace, repository.repository, task_id)

web_url = _construct_repo_url(gql_endpoint=client.endpoint, full_repo=repository) + "/-/tables"
web_url = (
_construct_repo_url(gql_endpoint=client.endpoint, full_repo=repository) + "/latest/-/tables"
)
click.echo()
click.echo(
"Success. See the repository at " + Color.BLUE + web_url + Color.END + " or query it with:"
Expand Down
22 changes: 18 additions & 4 deletions splitgraph/commandline/splitfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@
@click.option(
"-o", "--output-repository", help="Repository to store the result in.", type=RepositoryType()
)
def build_c(splitfile, args, output_repository):
@click.option(
"-l",
"--layered-querying",
help="Use writeable layered querying when checking images out. Experimental.",
is_flag=True,
)
def build_c(splitfile, args, output_repository, layered_querying):
"""
Build Splitgraph images.

Expand Down Expand Up @@ -54,7 +60,9 @@ def build_c(splitfile, args, output_repository):
file_name = os.path.splitext(os.path.basename(splitfile.name))[0]
output_repository = Repository.from_schema(file_name)

execute_commands(splitfile.read(), args, output=output_repository)
execute_commands(
splitfile.read(), args, output=output_repository, use_writeable_lq=layered_querying
)


@click.command(name="provenance")
Expand Down Expand Up @@ -205,7 +213,13 @@ def dependents_c(image_spec, source_on, dependents_on):
help="Images to substitute into the reconstructed Splitfile, of the form"
" [NAMESPACE/]REPOSITORY[:HASH_OR_TAG]. Default tag is 'latest'.",
)
def rebuild_c(image_spec, update, against):
@click.option(
"-l",
"--layered-querying",
help="Use writeable layered querying when checking images out. Experimental.",
is_flag=True,
)
def rebuild_c(image_spec, update, against, layered_querying):
"""
Rebuild images against different dependencies.

Expand Down Expand Up @@ -244,4 +258,4 @@ def rebuild_c(image_spec, update, against):

from splitgraph.splitfile.execution import rebuild_image

rebuild_image(image, new_images)
rebuild_image(image, new_images, use_writeable_lq=layered_querying)
32 changes: 22 additions & 10 deletions splitgraph/splitfile/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ def _combine_hashes(hashes: List[str]) -> str:
return sha256("".join(hashes).encode("ascii")).hexdigest()


def _checkout_or_calculate_layer(output: Repository, image_hash: str, calc_func: Callable) -> None:
def _checkout_or_calculate_layer(
output: Repository, image_hash: str, calc_func: Callable, use_writeable_lq: bool = False
) -> None:
# Future optimization here: don't actually check the layer out if it exists -- only do it at Splitfile execution
# end or when a command needs it.

# Have we already calculated this hash?
try:
output.images.by_hash(image_hash).checkout()
output.images.by_hash(image_hash).checkout(layered=use_writeable_lq)
logging.info(" ---> Using cache")
except ImageNotFoundError:
try:
Expand All @@ -64,6 +66,7 @@ def execute_commands(
params: Optional[Dict[str, str]] = None,
output: Optional[Repository] = None,
output_base: str = "0" * 32,
use_writeable_lq: bool = False,
) -> None:
"""
Executes a series of Splitfile commands.
Expand All @@ -74,6 +77,7 @@ def execute_commands(
:param output: Output repository to execute the Splitfile against.
:param output_base: If not None, a revision that gets checked out for all Splitfile actions to be committed
on top of it.
:param use_writeable_lq: Use writeable LQ to execute commands
"""
if params is None:
params = {}
Expand Down Expand Up @@ -152,16 +156,18 @@ def _initialize_output(output):
raise


def checkout_if_changed(repository: Repository, image_hash: str) -> None:
def checkout_if_changed(
repository: Repository, image_hash: str, use_writeable_lq: bool = False
) -> None:
if (
repository.head is None
or (repository.head.image_hash != image_hash)
or repository.has_pending_changes()
):
repository.images.by_hash(image_hash).checkout()
repository.images.by_hash(image_hash).checkout(layered=use_writeable_lq)
else:
logging.info(
"Skipping checkout of %s as %s has it checked out " "and there have been no changes",
"Skipping checkout of %s as %s has it checked out and there have been no changes",
image_hash,
repository,
)
Expand Down Expand Up @@ -379,7 +385,9 @@ def _calc():
source_mountpoint.delete()


def _execute_custom(node: Node, output: Repository) -> ProvenanceLine:
def _execute_custom(
node: Node, output: Repository, use_writeable_lq: bool = False
) -> ProvenanceLine:
assert output.head is not None
command, args = parse_custom_command(node)

Expand Down Expand Up @@ -410,7 +418,7 @@ def _execute_custom(node: Node, output: Repository) -> ProvenanceLine:
if command_hash is not None:
image_hash = _combine_hashes([output_head, command_hash])
try:
output.images.by_hash(image_hash).checkout()
output.images.by_hash(image_hash).checkout(layered=use_writeable_lq)
logging.info(" ---> Using cache")
return {"type": "CUSTOM"}
except ImageNotFoundError:
Expand All @@ -425,14 +433,16 @@ def _execute_custom(node: Node, output: Repository) -> ProvenanceLine:

# Check just in case if the new hash produced by the command already exists.
try:
output.images.by_hash(image_hash).checkout()
output.images.by_hash(image_hash).checkout(layered=use_writeable_lq)
except ImageNotFoundError:
# Full command as a commit comment
output.commit(image_hash, comment=node.text)
return {"type": "CUSTOM"}


def rebuild_image(image: Image, source_replacement: Dict[Repository, str]) -> None:
def rebuild_image(
image: Image, source_replacement: Dict[Repository, str], use_writeable_lq: bool = False
) -> None:
"""
Recreates the Splitfile used to create a given image and reruns it, replacing its dependencies with a different
set of versions.
Expand All @@ -444,4 +454,6 @@ def rebuild_image(image: Image, source_replacement: Dict[Repository, str]) -> No
ignore_irreproducible=False, source_replacement=source_replacement
)
# Params are supposed to be stored in the commands already (baked in) -- what if there's sensitive data there?
execute_commands("\n".join(splitfile_commands), output=image.repository)
execute_commands(
"\n".join(splitfile_commands), output=image.repository, use_writeable_lq=use_writeable_lq
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ Uploading the files...
(SUCCESS) Waiting for task ID ingest_task


Success. See the repository at http://www.example.com/someuser/somerepo_1/-/tables or query it with:
Success. See the repository at http://www.example.com/someuser/somerepo_1/latest/-/tables or query it with:
sgr cloud sql 'SELECT * FROM "someuser/somerepo_1"."base_df"'
30 changes: 23 additions & 7 deletions test/splitgraph/commandline/test_splitfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from unittest import mock
from unittest.mock import call

import pytest
from click.testing import CliRunner

from splitgraph.commandline import build_c, dependents_c, provenance_c, rebuild_c
Expand All @@ -17,11 +18,17 @@ def test_splitfile_default():
[RESOURCES + "import_remote_multiple.splitfile", "-a", "TAG", "latest"],
)
assert ec.mock_calls == [
call(mock.ANY, {"TAG": "latest"}, output=Repository("", "import_remote_multiple"))
call(
mock.ANY,
{"TAG": "latest"},
output=Repository("", "import_remote_multiple"),
use_writeable_lq=False,
)
]


def test_splitfile(local_engine_empty, pg_repo_remote):
@pytest.mark.parametrize("use_writeable_lq", [True, False])
def test_splitfile(local_engine_empty, pg_repo_remote, use_writeable_lq):
runner = CliRunner()

result = runner.invoke(
Expand All @@ -33,7 +40,8 @@ def test_splitfile(local_engine_empty, pg_repo_remote):
"latest",
"-o",
"output",
],
]
+ (["-l"] if use_writeable_lq else []),
)
assert result.exit_code == 0
assert OUTPUT.run_sql("SELECT id, fruit, vegetable FROM join_table") == [
Expand Down Expand Up @@ -78,17 +86,22 @@ def test_splitfile(local_engine_empty, pg_repo_remote):
assert "%s:%s" % (OUTPUT, OUTPUT.head.image_hash) in result.output


def test_splitfile_rebuild_update(local_engine_empty, pg_repo_remote_multitag):
@pytest.mark.parametrize("use_writeable_lq", [True, False])
def test_splitfile_rebuild_update(local_engine_empty, pg_repo_remote_multitag, use_writeable_lq):
runner = CliRunner()

result = runner.invoke(
build_c,
[RESOURCES + "import_remote_multiple.splitfile", "-a", "TAG", "v1", "-o", "output"],
[RESOURCES + "import_remote_multiple.splitfile", "-a", "TAG", "v1", "-o", "output"]
+ (["-l"] if use_writeable_lq else []),
)
assert result.exit_code == 0

# Rerun the output:latest against v2 of the test/pg_mount
result = runner.invoke(rebuild_c, ["output:latest", "--against", "test/pg_mount:v2"])
result = runner.invoke(
rebuild_c,
["output:latest", "--against", "test/pg_mount:v2"] + (["-l"] if use_writeable_lq else []),
)
output_v2 = OUTPUT.head
assert result.exit_code == 0
v2 = pg_repo_remote_multitag.images["v2"]
Expand All @@ -98,7 +111,10 @@ def test_splitfile_rebuild_update(local_engine_empty, pg_repo_remote_multitag):
# In this case, this should all resolve to the same version of test/pg_mount (v2) and not produce
# any extra commits.
curr_commits = OUTPUT.images()
result = runner.invoke(rebuild_c, ["output:latest", "-u"])
result = runner.invoke(
rebuild_c,
["output:latest", "-u"] + (["-l"] if use_writeable_lq else []),
)
assert result.exit_code == 0
assert output_v2 == OUTPUT.head
assert OUTPUT.images() == curr_commits
60 changes: 46 additions & 14 deletions test/splitgraph/splitfile/test_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,13 @@ def test_local_import_splitfile(pg_repo_local):
assert not OUTPUT.engine.table_exists(OUTPUT.to_schema(), "fruits")


def test_advanced_splitfile(pg_repo_local):
execute_commands(load_splitfile("import_local_multiple_with_queries.splitfile"), output=OUTPUT)
@pytest.mark.parametrize("use_writeable_lq", [True, False])
def test_advanced_splitfile(pg_repo_local, use_writeable_lq):
execute_commands(
load_splitfile("import_local_multiple_with_queries.splitfile"),
output=OUTPUT,
use_writeable_lq=use_writeable_lq,
)

assert OUTPUT.engine.table_exists(OUTPUT.to_schema(), "my_fruits")
assert OUTPUT.engine.table_exists(OUTPUT.to_schema(), "vegetables")
Expand All @@ -128,13 +133,22 @@ def test_advanced_splitfile(pg_repo_local):
assert OUTPUT.run_sql("SELECT * FROM my_fruits") == [(2, "orange")]


def test_splitfile_cached(pg_repo_local):
@pytest.mark.parametrize("use_writeable_lq", [True, False])
def test_splitfile_cached(pg_repo_local, use_writeable_lq):
# Check that no new commits/snaps are created if we rerun the same splitfile
execute_commands(load_splitfile("import_local_multiple_with_queries.splitfile"), output=OUTPUT)
execute_commands(
load_splitfile("import_local_multiple_with_queries.splitfile"),
output=OUTPUT,
use_writeable_lq=use_writeable_lq,
)
images = OUTPUT.images()
assert len(images) == 4

execute_commands(load_splitfile("import_local_multiple_with_queries.splitfile"), output=OUTPUT)
execute_commands(
load_splitfile("import_local_multiple_with_queries.splitfile"),
output=OUTPUT,
use_writeable_lq=use_writeable_lq,
)
new_images = OUTPUT.images()
assert new_images == images

Expand Down Expand Up @@ -205,8 +219,14 @@ def test_import_updating_splitfile_with_uploading(


@pytest.mark.mounting
@pytest.mark.parametrize("use_writeable_lq", [True, False])
def test_splitfile_end_to_end_with_uploading(
local_engine_empty, remote_engine, pg_repo_remote_multitag, mg_repo_remote, clean_minio
local_engine_empty,
remote_engine,
pg_repo_remote_multitag,
mg_repo_remote,
clean_minio,
use_writeable_lq,
):
# An end-to-end test:
# * Create a derived dataset from some tables imported from the remote engine
Expand All @@ -217,7 +237,10 @@ def test_splitfile_end_to_end_with_uploading(

# Do the same setting up first and run the splitfile against the remote data.
execute_commands(
load_splitfile("import_remote_multiple.splitfile"), params={"TAG": "v1"}, output=OUTPUT
load_splitfile("import_remote_multiple.splitfile"),
params={"TAG": "v1"},
output=OUTPUT,
use_writeable_lq=use_writeable_lq,
)

remote_output = Repository(OUTPUT.namespace, OUTPUT.repository, remote_engine)
Expand All @@ -230,22 +253,31 @@ def test_splitfile_end_to_end_with_uploading(
OUTPUT.objects.cleanup()

stage_2 = R("output_stage_2")
execute_commands(load_splitfile("import_from_preuploaded_remote.splitfile"), output=stage_2)
execute_commands(
load_splitfile("import_from_preuploaded_remote.splitfile"),
output=stage_2,
use_writeable_lq=use_writeable_lq,
)

assert stage_2.run_sql("SELECT id, name, fruit, vegetable FROM diet") == [
(2, "James", "orange", "carrot")
]


@pytest.mark.mounting
def test_splitfile_schema_changes(pg_repo_local, mg_repo_local):
execute_commands(load_splitfile("schema_changes.splitfile"), output=OUTPUT)
@pytest.mark.parametrize("use_writeable_lq", [True, False])
def test_splitfile_schema_changes(pg_repo_local, mg_repo_local, use_writeable_lq):
execute_commands(
load_splitfile("schema_changes.splitfile"), output=OUTPUT, use_writeable_lq=use_writeable_lq
)
old_output_head = OUTPUT.head

# Then, alter the dataset and rerun the splitfile.
pg_repo_local.run_sql("INSERT INTO fruits VALUES (12, 'mayonnaise')")
pg_repo_local.commit()
execute_commands(load_splitfile("schema_changes.splitfile"), output=OUTPUT)
execute_commands(
load_splitfile("schema_changes.splitfile"), output=OUTPUT, use_writeable_lq=use_writeable_lq
)
new_output_head = OUTPUT.head

old_output_head.checkout()
Expand Down Expand Up @@ -438,16 +470,16 @@ def test_splitfile_with_external_sql(readonly_pg_repo):


@pytest.mark.registry
def test_splitfile_inline_sql(readonly_pg_repo, pg_repo_local):
@pytest.mark.parametrize("use_writeable_lq", [True, False])
def test_splitfile_inline_sql(readonly_pg_repo, pg_repo_local, use_writeable_lq):
# Test SQL commands accessing repos directly -- join a remote repo with
# some local data.

prepare_lq_repo(pg_repo_local, commit_after_every=False, include_pk=True)
pg_repo_local.head.tag("v2")

execute_commands(
load_splitfile("inline_sql.splitfile"),
output=OUTPUT,
load_splitfile("inline_sql.splitfile"), output=OUTPUT, use_writeable_lq=use_writeable_lq
)

new_head = OUTPUT.head
Expand Down