While running a databricks job, especially running a job with large datasets and longer running queries that creates a lot of temp space - we might be facing below issue if we have a minimal configuration set to the cluster.
The simple way to fix this would be changing the spark driver config in the databricks cluster tab
spark.driver.maxResultSize = 100G (change the GB based on your cluster size)
To everyone out there, who wants to become a Data Engineer, keep following this blog as I am on the same path as you are. Interested in solving any data challenges (big/small). Having exposure on many tools and technologies is a nice to have, but what's must is to understand the underlying concepts or technical architectures or the internals of a tool. It makes us a better data engineer only if we try things out, learn something new, gain new tech experience. Only if we know what each tool does, the pros and cons of using it, only then we can select the right tools to solve the right problems. So I want to catalog all the learnings as it helps someone out there who is on the same path as me. Just sharing :)
Primary skills to become a data engineer:
1. Programming skills (Java/Python/Scala)
2. Querying Skills (SQL/Hive QL/Spark SQL)
3. ETL architectures (Batch/Streaming)
4. Data warehousing concepts / Database Design
5. Cloud computing (AWS/GCP/Azure)
6. Big Data (Hadoop/Spark)
7. Familiarity with scripting/automation - Python/Shell
Nice to have skills:
1. Versioning tools (Git)
2. Automating deployments (Jenkins)
3. Writing efficient stored procedures, functions (SQL) - Yeah I meant those 100's of lines of SQL code
Make sure the username to be
<username>.<sandbox_name>
client=
SalesforceStreamingClient(
consumer_key="<consumer
key>",
consumer_secret
= "<consumer secret>",
username="<username>.<sandbox_name>",
password
= "<password>",
sandbox=True
)
Connecting:
After creating
Client object, call open() - to establish a connection with the server
The connection is closed and
session is terminated by calling close()
client=
SalesforceStreamingClient(
consumer_key="<consumer
key>",
consumer_secret
= "<consumer secret>",
username="<username>",
password
= "<password>"
)
await
client.open()
#
subscribe and receive messages
await
client.close()
Client objects can also be used as asynchronous context
managers
aysnc with SalesforceStreamingClient(
onsumer_key="<consumer
key>",
consumer_secret
= "<consumer secret>",
username="<username>",
password
= "<password>") as client:
#
subscribe and receive messages
Channels:
String that looks
like a URLpath
ex:
/topic/foo OR /topic/bar
Subscriptions:
To receive
notification messages the client must subscribe to the channels it's
interested in
await
client.subscribe("/topic/foo")
If we no longer want messages
from the channel, we can unsubscribe
await
client.unsubscribe("/topic/foo")
The current set of
subscriptions can be obtained from the
Client.subscriptions
attribute
Receiving messages:
To receive
messages broadcasted by Salesforce after subscribing to the channels,
receive() method should be used.
message
= await client.receive()
this receive( ) will wait
until a message is received, else throw TransportTimeoutError - when
connection is lost with the server and client cannot re-establish the
connection
ServerError
- if connection gets closed by server
Client can also be used as an
asynchronous iterator in a for loop to wait for incoming messages.
async
for message in client:
#
process message.
Replay of Events:
Streaming-
advantage - get notified of events as they occur. However, downside -
client can get temporarily disconnected (Hardware/Software/Network
failures). So, it might miss some of the messages emitted by the server.
Salesforce event durability
- 24 hours
Salesforce
stores events for 24 hours
Events >24hour retention
period are discarded
Extends the event messages
with:
replayId
createdDate
"These
are called as ReplayMarker by aiosfstream"
used
by client to request the missed event messages from the server when it
reconnects
enables
to process the event messages from where it left off
Most convenient choice: Shelfobject - which can store ReplayMarkers on the disk, between application
restarts.
# Try to review the airflow config file found under AIRFLOW_HOME dir or go to UI and then follow the Admin -> Configuration menu.
$ cat airflow.cfg
We can learn more about airflow features from the configuration files as below:
It can storelogsremotely in AWSS3, GoogleCloudStorage or ElasticSearch (remote_logs, just specify the remote_log_conn_id)
default_timezone = UTC
specify executor to use: executor = SequentialExecutor (Some of the executors include SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor)
to go the airflow local metadata DB, refer to the property: sql_alchemy_conn (default SQLite, but uses many different database engines)
To specify the schema to use for the metadata database, SqlAlchemy supports databases with the concept of multiple schemas
sql_alchemy_schema =
parallelism = 32 (indicates the max no of task instances that should run simultaneously)
dag_concurrency = 16 (no of task instances allowed to run concurrently by the scheduler)
dags_are_paused_at_creation = True (whether DAGs are paused by default at creation)
max_active_runs_per_dag = 16 (max no of active DAG runs per DAG)
load_examples = True (better to set this to False in PROD environment)
plugins_folder = /User/Desktop/airflow/plugins (folder where airflow plugins are stored)
fernet_key = (secret key to save connection passwords in db)
dagbag_import_timeout = 30 (how long before timing out a python file import)
dag_file_processor_timeout = 50 (how long before timing out a DagFileProcessor, which process a dag file)
task_runner = StandardTaskRunner (the class to use for running task instances in a subprocess)
security = kerberos (which security model to use)
unit_test_mode = False (turn to True will overwrite many configuration options with test values at run time)
default_task_retries = 0 (number of retries each task is going to have by default)
endpoint_url = http://localhost:8080/ (if web_server_url_prefix is set, we must append it here)
auth_backend = to authenticate users of the API
Additional config properties:
- For CeleryExecutor, go to [celery]
- Kerberos - [kerberos]
- ElasticSearch - [elasticsearch]
- ElasticSearchConfigs - [elasticsearch_configs]
- KubernetesConfigs - [kubernetes]
The PID file for the webserver will be stored in $AIRFLOW_HOME/airflow-webserver.pid (OR)
/run/airflow/webserver.pid (if started by systemd)
Airflow uses SQLite DB for which parallelization is not possible.
It works in conjunction with the airflow.executors.sequential_executor.SequentialExecutor which will only run task instances sequentially.
# To trigger a few task instances:
# running the first task instance airflow run example_bash_operator runme_0 2015-01-01
# run a backfill over 2 days airflow backfill example_bash_operator -s 2015-01-01 -e 2015-01-02
Terraform - infrastructure as code.
- To build, manage and modify the infrastructure in a safe and repeatable way
Why terraform?
- to manage environments using configuration language
- here it uses HCL - HashiCorp Configuration Language
Infrastructure as a code?
- Instead of using UI to create resources, we use a file/files to mange infrastructure
- Resource: Any piece of infrastructure (Ex: Virtual machine, security group, network interface)'
- Provider - AWS, GCP, GitHub, Docker
- automates the creation of resources at the time of apply
Advantages of IAC:
- Easily repeatable
- easily readable
- operational certainty with "terraform plan"
- standardized environment builds
- quickly provisioned development environments
- disaster recovery
# specific type of resource that we want to create, using ID - here example
resource "aws_instance" "example" {
ami = "ami-8347957"
instance_type = "t2.micro"
}
Terraform workflow to deploy:
1. scope - confirm what resources need to be created for a given project
2. author - create the configuration file in HCL based on the scoped parameters
3. initialize - run terraform init in the project directory with the configuration files, this will download the correct provider plug-ins for the project
4. plan and apply:
- terraform plan -> to verify creation process
- terraform apply -> to create real resources as well as state file that compares future changes in our configuration files to what actually exists in the development environment