Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Delta table support for filesystem destination #1382

Open
wants to merge 22 commits into
base: devel
Choose a base branch
from

Conversation

jorritsandbrink
Copy link
Collaborator

@jorritsandbrink jorritsandbrink commented May 17, 2024

Description

This PR enables writing datasets to Delta tables in the filesystem destination.

A user can specify delta as table_format in a resource definition:

@dlt.resource(table_name="a_delta_table", table_format="delta")
def a_resource():
    ...

Related Issues

Contributes to #978

@jorritsandbrink jorritsandbrink added the enhancement New feature or request label May 17, 2024
@jorritsandbrink jorritsandbrink self-assigned this May 17, 2024
Copy link

netlify bot commented May 17, 2024

Deploy Preview for dlt-hub-docs ready!

Name Link
🔨 Latest commit 1e341cf
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/665c7e983eeda70008e3c4bc
😎 Deploy Preview https://deploy-preview-1382--dlt-hub-docs.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify site configuration.

@@ -309,8 +309,12 @@ def restore_file_load(self, file_path: str) -> LoadJob:
"""Finds and restores already started loading job identified by `file_path` if destination supports it."""
pass

def can_do_logical_replace(self, table: TTableSchema) -> bool:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this can become a destination capability if we turn Delta into a full destination.

remote_path = self.make_remote_path()
if self.is_local_filesystem:
client.fs_client.makedirs(self.pathlib.dirname(remote_path), exist_ok=True)
client.fs_client.put_file(local_path, remote_path)

def make_remote_path(self) -> str:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A separation is made because Delta tables are managed at the folder level, while file formats are managed at the file level.

@@ -257,6 +257,11 @@ def assert_all_data_types_row(
else:
db_mapping[binary_col] = bytes(db_mapping[binary_col])

# `delta` table format stores `wei` type as string
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decimals with precision higher than 38 are casted to string. pyarrow does not allow downcasting a decimal to a lower precision.


assert isinstance(self.config.credentials, AwsCredentials)
storage_options = self.config.credentials.to_session_credentials()
storage_options["AWS_REGION"] = self.config.credentials.region_name
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The deltalake library requires that AWS_REGION is provided. We need to add it to DLT_SECRETS_TOML under [destination.filesystem.credentials] to make s3 tests pass on CI.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah! this also may come from the machine default credentials. nevertheless we should warn or exit when this is not set


assert isinstance(self.config.credentials, GcpServiceAccountCredentials)
gcs_creds = self.config.credentials.to_gcs_credentials()
gcs_creds["token"]["private_key_id"] = "921837921798379812"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This must be changed so that private_key_id is fetched from configuration.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmmm OK, when you authenticate in Python you do not need to do that... we can add this as optional field. this also means that OAUTH authentication will not work? I think it is fine.

btw, can delta-rs find default google credentials? you can check if has_default_credentials() and then leave token as None. works for fsspec

storage_options = self.config.credentials.to_session_credentials()
storage_options["AWS_REGION"] = self.config.credentials.region_name
# https://delta-io.github.io/delta-rs/usage/writing/writing-to-s3-with-locking-provider/#enable-unsafe-writes-in-s3-opt-in
storage_options["AWS_S3_ALLOW_UNSAFE_RENAME"] = "true"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting AWS_S3_ALLOW_UNSAFE_RENAME to true is the simplest setup. Perhaps we can later extend and let the user configure a locking provider.

Context: https://delta-io.github.io/delta-rs/usage/writing/writing-to-s3-with-locking-provider/.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting. we have a locking provider but probably not compatible with delta. it is called transactional_file.py

@jorritsandbrink
Copy link
Collaborator Author

@rudolfix Can you review?

Delta tables are managed at the folder level, not the file level. Hence, they are treated differently than the csv, jsonl, and parquet file formats.

I'll add docs after we've settled on the user interface.

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohhh I was sure we can have delta tables as a single file. when I look at your code I think we should do something else:

make a destination out of that. it could be based on filesystem and use the same credentials. OFC we support only append and replace.

  1. we do not use file format. we should use table_format, we add delta to it (along pyiceberg)
  2. you can create different jobs withing the filesystem destination depending on table_format. - check how we do it in athena. I think here it will be much simpler
  3. you can then separate delta code from regular file code and in the future we can add pyiceberg support easily
  4. I'm not sure we can add merge support this way.... however we can always abuse the existing merge mechanism (when there's merge write disposition, delta job does nothing but requests a followup job so at the end we process all table files at once)

dlt/pipeline/pipeline.py Outdated Show resolved Hide resolved
def _write_delta_table(
self, path: str, table: "pa.Table", write_disposition: TWriteDisposition # type: ignore[name-defined] # noqa
) -> None:
"""Writes in-memory Arrow table to on-disk Delta table."""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two questions here:

  1. we can have many files for a given table. are we able to write them at once?
  2. to the above: writing several tables at once in parallel: is it supported? (should be really :))
  3. do we really need to load parquet file into memory? I know that you clean it up. but we can implement paruqet alignment differently ie. via another "flavour" of parquet that given destination can request.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I introduced the concept of a "directory job", 1 is possible now. 2 is also possible. Both are tested in test_pipeline_delta_filesystem_destination. 3 seems not possible as discussed on the chat:
image


assert isinstance(self.config.credentials, AwsCredentials)
storage_options = self.config.credentials.to_session_credentials()
storage_options["AWS_REGION"] = self.config.credentials.region_name
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah! this also may come from the machine default credentials. nevertheless we should warn or exit when this is not set

storage_options = self.config.credentials.to_session_credentials()
storage_options["AWS_REGION"] = self.config.credentials.region_name
# https://delta-io.github.io/delta-rs/usage/writing/writing-to-s3-with-locking-provider/#enable-unsafe-writes-in-s3-opt-in
storage_options["AWS_S3_ALLOW_UNSAFE_RENAME"] = "true"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting. we have a locking provider but probably not compatible with delta. it is called transactional_file.py


assert isinstance(self.config.credentials, GcpServiceAccountCredentials)
gcs_creds = self.config.credentials.to_gcs_credentials()
gcs_creds["token"]["private_key_id"] = "921837921798379812"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmmm OK, when you authenticate in Python you do not need to do that... we can add this as optional field. this also means that OAUTH authentication will not work? I think it is fine.

btw, can delta-rs find default google credentials? you can check if has_default_credentials() and then leave token as None. works for fsspec

import pyarrow as pa
from deltalake import write_deltalake

def adjust_arrow_schema(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all of those look like utility function that could be available independently and also unit tested

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved them to utils.py and made them independent. Haven't added unit tests yet.

@rudolfix rudolfix mentioned this pull request May 22, 2024
14 tasks
@@ -214,6 +214,20 @@ def exception(self) -> str:
pass


class DirectoryLoadJob:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very minimal for now. Want to get some feedback before further polishing.

@@ -177,6 +178,15 @@ def __str__(self) -> str:
return self.job_id()


class ParsedLoadJobDirectoryName(NamedTuple):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also very minimal. Same as above.


def restore_file_load(self, file_path: str) -> LoadJob:
return EmptyLoadJob.from_file_path(file_path, "completed")

def start_dir_load(self, table: TTableSchema, dir_path: str, load_id: str) -> DirectoryLoadJob:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps iceberg will also be a directory job if we add it.

@jorritsandbrink
Copy link
Collaborator Author

@rudolfix Can you review once more?

I addressed some of your feedback. Biggest changes since last review:

  1. Introduction of the "directory job" so multiple files can be loaded at once. Thus far, the concept of a job was tightly coupled with a single file (if I understand correctly). But if there are multiple Parquet files, we want to be able to load them into the Delta table in a single commit. That is now possible. Code is still rogue/minimal, but I want some feedback before polishing it up.
  2. Dedicated job for Delta table loads: DeltaLoadFilesystemJob.
  3. Turned Delta-related methods into static utility methods.

file_path = self.package_storage.get_job_file_path(
load_id, PackageStorage.NEW_JOBS_FOLDER, file_name
load_id, PackageStorage.NEW_JOBS_FOLDER, file_name, subfolder
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where a folder gets included in the file path for directory jobs.

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall direction looks good! still several tasks are left but we have a clear direction now.

the big unified arrow table that is composed of many files in the dataset worries me a little. but we can do batching in a followup ticket.


write_delta_table(
path=self.client.make_remote_uri(self.make_remote_path()),
data=ds.dataset(file_paths),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK this is cool

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just one thing: you pass a dataset here. and file_paths may belong to different tables (top level and its children). when writing into data lake you do:

def ensure_arrow_table(data: Union[pyarrow.Table, pyarrow.dataset.Dataset]) -> pyarrow.Table:
    return data.to_table() if isinstance(data, pyarrow.dataset.Dataset) else data

this will create a single table with unified schema so you may get union with child tables. best if you create a test that has child tables in table chain

path=self.client.make_remote_uri(self.make_remote_path()),
data=ds.dataset(file_paths),
write_disposition=self.table["write_disposition"],
storage_options=_deltalake_storage_options(self.client),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think we should have a way to attach additional configuration to filesystem config? or we can attach such configuration to the resource?

_deltalake_storage_options,
)

file_paths = [job.file_path for job in self.table_jobs]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also filter out only parquet files? and warn if there are any other file types?

@@ -269,19 +305,22 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) ->
# where we want to load the state the regular way
if table["name"] == self.schema.state_table_name and not self.config.as_staging:
return DoNothingJob(file_path)
if table["table_format"] == "delta":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm I'm not sure each resource has always this key.

table_jobs: Optional[Sequence[LoadJobInfo]] = None,
) -> List[NewLoadJob]:
jobs = super().create_table_chain_completed_followup_jobs(table_chain, table_jobs)
table_format = table_chain[0]["table_format"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here. is this key required? I do not think so...

from dlt.common.configuration.specs import GcpServiceAccountCredentials

assert isinstance(client.config.credentials, GcpServiceAccountCredentials)
gcs_creds = client.config.credentials.to_gcs_credentials()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to upgrade our credentials, right?

temp_file = os.path.join(tempfile.gettempdir(), self._file_name)
with open(temp_file, "w", encoding="utf-8") as f:
f.write(data)
f.write("" if data is None else data)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks very suspicious. when this is called with empty string? should never happen.

dlt/load/load.py Outdated
@@ -256,8 +256,13 @@ def create_followup_jobs(
if table_chain := get_completed_table_chain(
schema, all_jobs, top_job_table, starting_job.job_file_info().job_id()
):
table_jobs = [
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here you do not select all jobs belonging to table chain, just to the top table. you should select tables in the set of tables of table_chain.

by default child tables inherit settings like write disposition and table format so they can be loaded together

@@ -638,11 +638,40 @@ def run(
Returns:
LoadInfo: Information on loaded data including the list of package ids and failed job statuses. Please not that `dlt` will not raise if a single job terminally fails. Such information is provided via LoadInfo.
"""

def ensure_loader_file_format(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this IMO should be moved fully to normalize.py.

  1. we should add supported table formats to destination capabilities. athena has iceberg, filesystem has delta
  2. in normalize.py in w_normalize_files you have _get_items_normalizer that will pick optimal item normalizer and best file format for given settings. also look at resolve_best_writer_spec
  3. we know schema of the table when _get_items_normalizer is called. if it contains table format
  • we check if destination caps support it, if not we issue warning that table fromat will be most probably ignored
  • if they support it we somehow force the parquet (ie via preferred file format or additional param to the function)

@@ -64,7 +64,7 @@
"dedup_sort",
]
"""Known hints of a column used to declare hint regexes."""
TTableFormat = Literal["iceberg", "parquet", "jsonl"]
TTableFormat = Literal["iceberg", "parquet", "jsonl", "delta"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should kick out "parquet" and "jsonl" from here. athena uses jsonl but does it badly (and should use file format simply). we can even comment out the part that creates jsonl tables in athena.

elif table_format == "jsonl":
                sql.append(f"""CREATE EXTERNAL TABLE {qualified_table_name}
                        ({columns})
                        ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
                        LOCATION '{location}';""")

why: to have a clear distinction between file format and table format. I see 3 formats now: iceberg, delta and hive (or pyarrow dataset)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
Status: In Progress
Development

Successfully merging this pull request may close these issues.

None yet

2 participants