Skip to main content

prefect_databricks.flows

Module containing flows for interacting with Databricks

Functions

jobs_runs_submit_and_wait_for_completion

jobs_runs_submit_and_wait_for_completion(databricks_credentials: DatabricksCredentials, tasks: Optional[List[RunSubmitTaskSettings]] = None, run_name: Optional[str] = None, max_wait_seconds: int = 900, poll_frequency_seconds: int = 10, git_source: Optional[GitSource] = None, timeout_seconds: Optional[int] = None, idempotency_token: Optional[str] = None, access_control_list: Optional[List[AccessControlRequest]] = None, return_metadata: bool = False, job_submission_handler: Optional[Callable] = None, **jobs_runs_submit_kwargs: Dict[str, Any]) -> Union[NotebookOutput, Tuple[NotebookOutput, JobMetadata], None]
Flow that triggers a job run and waits for the triggered run to complete. Args:
  • databricks_credentials: Credentials to use for authentication with Databricks.
  • tasks: Tasks to run, e.g.
[
    {
        "task_key"\: "Sessionize",
        "description"\: "Extracts session data from events",
        "depends_on"\: [],
        "existing_cluster_id"\: "0923-164208-meows279",
        "spark_jar_task"\: {
            "main_class_name"\: "com.databricks.Sessionize",
            "parameters"\: ["--data", "dbfs\:/path/to/data.json"],
        },
        "libraries"\: [{"jar"\: "dbfs\:/mnt/databricks/Sessionize.jar"}],
        "timeout_seconds"\: 86400,
    },
    {
        "task_key"\: "Orders_Ingest",
        "description"\: "Ingests order data",
        "depends_on"\: [],
        "existing_cluster_id"\: "0923-164208-meows279",
        "spark_jar_task"\: {
            "main_class_name"\: "com.databricks.OrdersIngest",
            "parameters"\: ["--data", "dbfs\:/path/to/order-data.json"],
        },
        "libraries"\: [{"jar"\: "dbfs\:/mnt/databricks/OrderIngest.jar"}],
        "timeout_seconds"\: 86400,
    },
    {
        "task_key"\: "Match",
        "description"\: "Matches orders with user sessions",
        "depends_on"\: [
            {"task_key"\: "Orders_Ingest"},
            {"task_key"\: "Sessionize"},
        ],
        "new_cluster"\: {
            "spark_version"\: "7.3.x-scala2.12",
            "node_type_id"\: "i3.xlarge",
            "spark_conf"\: {"spark.speculation"\: True},
            "aws_attributes"\: {
                "availability"\: "SPOT",
                "zone_id"\: "us-west-2a",
            },
            "autoscale"\: {"min_workers"\: 2, "max_workers"\: 16},
        },
        "notebook_task"\: {
            "notebook_path"\: "/Users/user.name@databricks.com/Match",
            "base_parameters"\: {"name"\: "John Doe", "age"\: "35"},
        },
        "timeout_seconds"\: 86400,
    },
]
  • run_name: An optional name for the run. The default value is Untitled, e.g. A multitask job run.
  • git_source: This functionality is in Public Preview. An optional specification for a remote repository containing the notebooks used by this job’s notebook tasks. Key-values:
  • git_url: URL of the repository to be cloned by this job. The maximum length is 300 characters, e.g. https\://github.com/databricks/databricks-cli.
  • git_provider: Unique identifier of the service used to host the Git repository. The value is case insensitive, e.g. github.
  • git_branch: Name of the branch to be checked out and used by this job. This field cannot be specified in conjunction with git_tag or git_commit. The maximum length is 255 characters, e.g. main.
  • git_tag: Name of the tag to be checked out and used by this job. This field cannot be specified in conjunction with git_branch or git_commit. The maximum length is 255 characters, e.g. release-1.0.0.
  • git_commit: Commit to be checked out and used by this job. This field cannot be specified in conjunction with git_branch or git_tag. The maximum length is 64 characters, e.g. e0056d01.
  • git_snapshot: Read-only state of the remote repository at the time the job was run. This field is only included on job runs.
  • timeout_seconds: An optional timeout applied to each run of this job. The default behavior is to have no timeout, e.g. 86400.
  • idempotency_token: An optional token that can be used to guarantee the idempotency of job run requests. If a run with the provided token already exists, the request does not create a new run but returns the ID of the existing run instead. If a run with the provided token is deleted, an error is returned. If you specify the idempotency token, upon failure you can retry until the request succeeds. Databricks guarantees that exactly one run is launched with that idempotency token. This token must have at most 64 characters. For more information, see How to ensure idempotency for jobs, e.g. 8f018174-4792-40d5-bcbc-3e6a527352c8.
  • access_control_list: List of permissions to set on the job.
  • max_wait_seconds: Maximum number of seconds to wait for the entire flow to complete.
  • poll_frequency_seconds: Number of seconds to wait in between checks for run completion.
  • return_metadata: When True, method will return a tuple of notebook output as well as job run metadata; by default though, the method only returns notebook output
  • job_submission_handler: An optional callable to intercept job submission.
  • **jobs_runs_submit_kwargs: Additional keyword arguments to pass to jobs_runs_submit.
Returns:
  • Either a dict or a tuple (depends on return_metadata) comprised of
    • task_notebook_outputs: dictionary of task keys to its corresponding notebook output; this is the only object returned by default from this method
    • jobs_runs_metadata: dictionary containing IDs of the jobs runs tasks; this is only returned if return_metadata=True.
Examples: Submit jobs runs and wait.
from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import jobs_runs_submit_and_wait_for_completion
from prefect_databricks.models.jobs import (
    AutoScale,
    AwsAttributes,
    JobTaskSettings,
    NotebookTask,
    NewCluster,
)

@flow
def jobs_runs_submit_and_wait_for_completion_flow(notebook_path, **base_parameters):
    databricks_credentials = await DatabricksCredentials.load("BLOCK_NAME")

    # specify new cluster settings
    aws_attributes = AwsAttributes(
        availability="SPOT",
        zone_id="us-west-2a",
        ebs_volume_type="GENERAL_PURPOSE_SSD",
        ebs_volume_count=3,
        ebs_volume_size=100,
    )
    auto_scale = AutoScale(min_workers=1, max_workers=2)
    new_cluster = NewCluster(
        aws_attributes=aws_attributes,
        autoscale=auto_scale,
        node_type_id="m4.large",
        spark_version="10.4.x-scala2.12",
        spark_conf={"spark.speculation": True},
    )

    # specify notebook to use and parameters to pass
    notebook_task = NotebookTask(
        notebook_path=notebook_path,
        base_parameters=base_parameters,
    )

    # compile job task settings
    job_task_settings = JobTaskSettings(
        new_cluster=new_cluster,
        notebook_task=notebook_task,
        task_key="prefect-task"
    )

    multi_task_runs = jobs_runs_submit_and_wait_for_completion(
        databricks_credentials=databricks_credentials,
        run_name="prefect-job",
        tasks=[job_task_settings]
    )

    return multi_task_runs

jobs_runs_wait_for_completion

jobs_runs_wait_for_completion(multi_task_jobs_runs_id: int, databricks_credentials: DatabricksCredentials, run_name: Optional[str] = None, max_wait_seconds: int = 900, poll_frequency_seconds: int = 10)
Flow that triggers a job run and waits for the triggered run to complete. Args:
  • run_name: The name of the jobs runs task.
  • multi_task_jobs_run_id: The ID of the jobs runs task to watch.
  • databricks_credentials: Credentials to use for authentication with Databricks.
  • max_wait_seconds: Maximum number of seconds to wait for the entire flow to complete.
  • poll_frequency_seconds: Number of seconds to wait in between checks for run completion.
Returns:
  • A dict containing the jobs runs life cycle state and message.
  • A dict containing IDs of the jobs runs tasks.

jobs_runs_submit_by_id_and_wait_for_completion

jobs_runs_submit_by_id_and_wait_for_completion(databricks_credentials: DatabricksCredentials, job_id: int, idempotency_token: Optional[str] = None, jar_params: Optional[List[str]] = None, max_wait_seconds: int = 900, poll_frequency_seconds: int = 10, notebook_params: Optional[Dict] = None, python_params: Optional[List[str]] = None, spark_submit_params: Optional[List[str]] = None, python_named_params: Optional[Dict] = None, pipeline_params: Optional[str] = None, sql_params: Optional[Dict] = None, dbt_commands: Optional[List] = None, job_submission_handler: Optional[Callable] = None, **jobs_runs_submit_kwargs: Dict[str, Any]) -> Dict
flow that triggers an existing job and waits for its completion Args:
  • databricks_credentials: Credentials to use for authentication with Databricks.
  • job_id: Id of the databricks job.
  • idempotency_token: An optional token that can be used to guarantee the idempotency of job run requests. If a run with the provided token already exists, the request does not create a new run but returns the ID of the existing run instead. If a run with the provided token is deleted, an error is returned. If you specify the idempotency token, upon failure you can retry until the request succeeds. Databricks guarantees that exactly one run is launched with that idempotency token. This token must have at most 64 characters. For more information, see How to ensure idempotency for jobs, e.g. 8f018174-4792-40d5-bcbc-3e6a527352c8.
  • jar_params: A list of parameters for jobs with Spark JAR tasks, for example “jar_params” : [“john doe”, “35”]. The parameters are used to invoke the main function of the main class specified in the Spark JAR task. If not specified upon run- now, it defaults to an empty list. jar_params cannot be specified in conjunction with notebook_params. The JSON representation of this field (for example {"jar_params"\: ["john doe","35"]}) cannot exceed 10,000 bytes.
  • max_wait_seconds: Maximum number of seconds to wait for the entire flow to complete.
  • poll_frequency_seconds: Number of seconds to wait in between checks for run completion.
  • notebook_params: A map from keys to values for jobs with notebook task, for example “notebook_params”: {"name"\: "john doe", "age"\: "35"}. The map is passed to the notebook and is accessible through the dbutils.widgets.get function. If not specified upon run-now, the triggered run uses the job’s base parameters. notebook_params cannot be specified in conjunction with jar_params. Use Task parameter variables to set parameters containing information about job runs. The JSON representation of this field (for example {"notebook_params"\:{"name"\:"john doe","age"\:"35"}}) cannot exceed 10,000 bytes.
  • python_params: A list of parameters for jobs with Python tasks, for example “python_params” :[“john doe”, “35”]. The parameters are passed to Python file as command- line parameters. If specified upon run-now, it would overwrite the parameters specified in job setting. The JSON representation of this field (for example {"python_params"\:["john doe","35"]}) cannot exceed 10,000 bytes Use Task parameter variables to set parameters containing information about job runs. These parameters accept only Latin characters (ASCII character set). Using non-ASCII characters returns an error. Examples of invalid, non-ASCII characters are Chinese, Japanese kanjis, and emojis.
  • spark_submit_params: A list of parameters for jobs with spark submit task, for example “spark_submit_params”: [“—class”, “org.apache.spark.examples.SparkPi”]. The parameters are passed to spark-submit script as command-line parameters. If specified upon run-now, it would overwrite the parameters specified in job setting. The JSON representation of this field (for example {"python_params"\:["john doe","35"]}) cannot exceed 10,000 bytes. Use Task parameter variables to set parameters containing information about job runs. These parameters accept only Latin characters (ASCII character set). Using non-ASCII characters returns an error. Examples of invalid, non-ASCII characters are Chinese, Japanese kanjis, and emojis.
  • python_named_params: A map from keys to values for jobs with Python wheel task, for example “python_named_params”: {"name"\: "task", "data"\: "dbfs\:/path/to/data.json"}.
  • pipeline_params: If full_refresh is set to true, trigger a full refresh on the delta live table e.g.
    "pipeline_params"\: {"full_refresh"\: true}
  • sql_params: A map from keys to values for SQL tasks, for example “sql_params”: {"name"\: "john doe", "age"\: "35"}. The SQL alert task does not support custom parameters.
  • dbt_commands: An array of commands to execute for jobs with the dbt task, for example “dbt_commands”: [“dbt deps”, “dbt seed”, “dbt run”]
  • job_submission_handler: An optional callable to intercept job submission
Raises:
  • DatabricksJobTerminated: Raised when the Databricks job run is terminated with a non-successful result state.
  • DatabricksJobSkipped: Raised when the Databricks job run is skipped.
  • DatabricksJobInternalError: Raised when the Databricks job run encounters an internal error.
Returns:
  • A dictionary containing information about the completed job run.

Classes

DatabricksJobTerminated

Raised when Databricks jobs runs submit terminates

DatabricksJobSkipped

Raised when Databricks jobs runs submit skips

DatabricksJobInternalError

Raised when Databricks jobs runs submit encounters internal error

DatabricksJobRunTimedOut

Raised when Databricks jobs runs does not complete in the configured max wait seconds