Wednesday, November 18, 2020

Databricks: Job aborted due to stage failure. Total size of serialized results is bigger that spark driver memory.

 

While running a databricks job, especially running a job with large datasets and longer running queries that creates a lot of temp space - we might be facing below issue if we have a minimal configuration set to the cluster. 








The simple way to fix this would be changing the spark driver config in the databricks cluster tab

spark.driver.maxResultSize = 100G (change the GB based on your cluster size)


Tuesday, November 17, 2020

How to become a data engineer?

 To everyone out there, who wants to become a Data Engineer, keep following this blog as I am on the same path as you are. Interested in solving any data challenges (big/small). Having exposure on many tools and technologies is a nice to have, but what's must is to understand the underlying concepts or technical architectures or the internals of a tool. It makes us a better data engineer only if we try things out, learn something new, gain new tech experience. Only if we know what each tool does, the pros and cons of using it, only then we can select the right tools to solve the right problems. So I want to catalog all the learnings as it helps someone out there who is on the same path as me. Just sharing :) 


Primary skills to become a data engineer:

1. Programming skills (Java/Python/Scala)

2. Querying Skills (SQL/Hive QL/Spark SQL)

3. ETL architectures (Batch/Streaming)

4. Data warehousing concepts / Database Design

5. Cloud computing (AWS/GCP/Azure)

6. Big Data (Hadoop/Spark)

7. Familiarity with scripting/automation - Python/Shell


Nice to have skills:

1. Versioning tools (Git)

2. Automating deployments (Jenkins)

3. Writing efficient stored procedures, functions (SQL) - Yeah I meant those 100's of lines of SQL code

4. Tools (Databricks, Pentaho, Sqoop, Online Editors)

5. Building data lakes and DWH's (really helps if we build using traditional approach and then try to migrate the same to cloud). 


Wednesday, August 12, 2020

Connecting to Salesforce using Python [aiosfstream]

Connect to Salesforce Streaming Library using python to consume Salesforce Objects. 


Library used: aiosfstream

Ref link: https://aiosfstream.readthedocs.io/en/latest/quickstart.html#connecting

 

Quick start:

 

  1. Authentication:
    1. To connect to salesforce streaming API, all clients must authenticate themselves.
    2. supports various ways:

Username - Password authentication (using SalesforceStreamingClient)

 

client= SalesforceStreamingClient(

consumer_key="<consumer key>",

consumer_secret = "<consumer secret>",

username="<username>",

password = "<password>"

)

# client = Client(auth)

 

 

Refresh token authentication

 

auth = RefreshTokenAuthenticator(

consumer_key = "<consumer key>",

consumer_secret = "<consumer secret>",

refresh_token = "<refresh_token>"

)

client = Client(auth)

 

Authentication on sandbox args:

  • Remember to set sandbox=True
  • Make sure the username to be <username>.<sandbox_name>

 

client= SalesforceStreamingClient(

consumer_key="<consumer key>",

consumer_secret = "<consumer secret>",

username="<username>.<sandbox_name>",

password = "<password>",

sandbox=True

)

 

 

  1. Connecting:
    1. After creating Client object, call open() - to establish a connection with the server
    2. The connection is closed and session is terminated by calling close()

 

client= SalesforceStreamingClient(

consumer_key="<consumer key>",

consumer_secret = "<consumer secret>",

username="<username>",

password = "<password>"

)

 

await client.open()

 

# subscribe and receive messages

 

await client.close()

 

  1. Client objects can also be used as asynchronous context managers

 

aysnc with SalesforceStreamingClient(

onsumer_key="<consumer key>",

consumer_secret = "<consumer secret>",

username="<username>",

password = "<password>") as client:

# subscribe and receive messages

 

 

 

  1. Channels:
    1. String that looks like a URL path

ex: /topic/foo OR /topic/bar

 

 

  1. Subscriptions:
    1. To receive notification messages the client must subscribe to the channels it's interested in

await client.subscribe("/topic/foo")

  1. If we no longer want messages from the channel, we can unsubscribe

await client.unsubscribe("/topic/foo")

  1. The current set of subscriptions can be obtained from the

Client.subscriptions attribute

 

  1. Receiving messages:
    1. To receive messages broadcasted by Salesforce after subscribing to the channels, receive() method should be used.

message = await client.receive()

  1. this receive( ) will wait until a message is received, else throw TransportTimeoutError - when connection is lost with the server and client cannot re-establish the connection

ServerError - if connection gets closed by server

  1. Client can also be used as an asynchronous iterator in a for loop to wait for incoming messages.

async for message in client:

# process message.

 

 

 

  1. Replay of Events:
    1. Streaming- advantage - get notified of events as they occur. However, downside - client can get temporarily disconnected (Hardware/Software/Network failures). So, it might miss some of the messages emitted by the server.
    2. Salesforce event durability - 24 hours
      1. Salesforce stores events for 24 hours
      2. Events >24hour retention period are discarded
      3. Extends the event messages with:
        1. replayId
        2. createdDate

"These are called as ReplayMarker by aiosfstream"

used by client to request the missed event messages from the server when it reconnects

enables to process the event messages from where it left off

  1. Most convenient choice: Shelf object - which can store ReplayMarkers on the disk, between application restarts.

 

with shelve.open("replay.db") as replay:

aysnc with SalesforceStreamingClient(

consumer_key="<consumer key>",

consumer_secret = "<consumer secret>",

username="<username>",

password = "<password>",

replay=replay) as client:

 

await client.subscribe("/topic/foo")

 

async for message in client:

# process message

 


Tuesday, August 11, 2020

Copy data from S3 to Aurora Postgres

Scenario 1: To copy data from S3 to Aurora Postgres (greater than v9 or latest)

How?: We can use aws_s3.table_import_from_s3 function (to migrate the data from S3 to Aurora Postgres). 

Steps:

A sample file with columns - id, prefix, mstr_id is copied to S3. 

Create schema on Aurora Postgres (with the required columns). 

drop table core.mstr;

CREATE TABLE core.mstr (
	id varchar(300) NULL,
	prefix varchar(300) NULL,
	mstr_id float8 NULL
);

Copy command to transfer the data from S3 to Aurora Postgres

SELECT aws_s3.table_import_from_s3(
   'core.MSTR', 
   'id,prefix,mstr_id', 
   '(format csv, header true)',
   '<bucket-name>',
   'MSTR_DATA/part_file_00.csv',
   'us-east-2',
   '<secret key>', '<access key>'
);


Note: If IAM roles are given, we need not specify access keys. 

SELECT aws_s3.table_import_from_s3(
   'core.MSTR', 
   'id,prefix,mstr_id', 
   '(format csv, header true)',
   '<bucket-name>',
   'MSTR_DATA/part_file_00.csv',
   'us-east-2'
);


Note: aws_s3.table_import_from_s3 function is not supported for versions earlier than Aurora Postgres Version 9.


Scenario 2: To copy data from S3 to Aurora Postgres (older versions than 9)





How? : Use SQOOP Job to export the data from S3 to Aurora Postgres. 

Steps

Upload data files to s3

Create schema on Aurora Postgres (with the required columns). 

drop table core.mstr;

CREATE TABLE core.mstr (
	id varchar(300) NULL,
	prefix varchar(300) NULL,
	mstr_id float8 NULL
);

Spin-up EMR Cluster
  • Must have access to S3 and Aurora Postgres
  • Check for connectivity: 
    • Whether EMR cluster and RDS reside in same VPC
      • dig <Aurora hostname>
      • nc -vz <hostname> (must get a message: connectivity looks good)
  • Make sure the security groups are properly assigned to the EMR Cluster. 
    • The security group must ALLOW traffic from CORE node of EMR Cluster
    • Must be added to the RDS Security Group as a source

Sqoop Job

echo "Exporting MSTR table from S3 to Postgres"
echo "Sqoop Job in Progress"

sqoop export 
--connect <jdbc URL>
 --input-null-string NULL 
 --input-null-non-string NULL 
 --table mstr 
 --export-dir s3://<bucket_name>/MSTR/ 
 --username user_etl 
 --P
 -- --schema test
echo "Sqoop Job is executed successfully"
echo "Data will now be available on Aurora Postgres"


Friday, April 3, 2020

Getting started with apache-airflow (Part1)

# Apache airflow
quick start link: https://airflow.apache.org/docs/stable/start.html



# export the AIRFLOW_HOME
vi ~/.bash_profile
# setting AIRFLOW HOME
export AIRFLOW_HOME=/User/Desktop/airflow/

cd ~AIRFLOW_HOME

# start the virtual environment
python3 -m venv ./venv

# to show the list of dependencies
pip3 list

# install apache airflow
pip3 install apache-airflow

# initialize the airflow database
$ airflow initdb

# starting the webserver on port 8080
$ airflow webserver -p 8080


Now, we must be able to see Airflow-DAG's on local URL:
http://localhost:8080/admin/


# start the scheduler
$ airflow scheduler


# Try to review the airflow config file found under AIRFLOW_HOME dir or go to UI and then follow the Admin -> Configuration menu.

$ cat airflow.cfg

We can learn more about airflow features from the configuration files as below:

  • It can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search (remote_logs, just specify the remote_log_conn_id)
  • default_timezone = UTC
  • specify executor to use: executor = SequentialExecutor (Some of the executors include SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor)
  • to go the airflow local metadata DB, refer to the property: sql_alchemy_conn (default SQLite, but uses many different database engines)
  • To specify the schema to use for the metadata database, SqlAlchemy supports databases with the concept of multiple schemas
    • sql_alchemy_schema
  • parallelism = 32 (indicates the max no of task instances that should run simultaneously)
  • dag_concurrency = 16 (no of task instances allowed to run concurrently by the scheduler)
  • dags_are_paused_at_creation = True (whether DAGs are paused by default at creation)
  • max_active_runs_per_dag = 16 (max no of active DAG runs per DAG)
  • load_examples = True (better to set this to False in PROD environment)
  • plugins_folder = /User/Desktop/airflow/plugins (folder where airflow plugins are stored)
  • fernet_key = (secret key to save connection passwords in db)
  • dagbag_import_timeout = 30 (how long before timing out a python file import)
  • dag_file_processor_timeout = 50 (how long before timing out a DagFileProcessor, which process a dag file)
  • task_runner = StandardTaskRunner (the class to use for running task instances in a subprocess)
  • security = kerberos (which security model to use)
  • unit_test_mode = False (turn to True will overwrite many configuration options with test values at run time)
  • default_task_retries = 0 (number of retries each task is going to have by default)
  • endpoint_url = http://localhost:8080/ (if web_server_url_prefix is set, we must append it here)
  • auth_backend = to authenticate users of the API
Additional config properties:



- For CeleryExecutor, go to [celery]
- Kerberos - [kerberos]
- ElasticSearch - [elasticsearch]
- Elastic Search Configs - [elasticsearch_configs]
- Kubernetes Configs - [kubernetes]

The PID file for the webserver will be stored in 
$AIRFLOW_HOME/airflow-webserver.pid (OR)
/run/airflow/webserver.pid (if started by systemd)

Airflow uses SQLite DB for which parallelization is not possible.
It works in conjunction with the airflow.executors.sequential_executor.SequentialExecutor which will only run task instances sequentially.

# To trigger a few task instances:





# running the first task instance
airflow run example_bash_operator runme_0 2015-01-01


# run a backfill over 2 days
airflow backfill example_bash_operator -s 2015-01-01 -e 2015-01-02

Wednesday, February 19, 2020

Getting started with terraform

Install terraform:
brew install terraform

Terraform - infrastructure as code.
- To build, manage and modify the infrastructure in a safe and repeatable way

Why terraform?
- to manage environments using configuration language
- here it uses HCL - HashiCorp Configuration Language


Infrastructure as a code?
- Instead of using UI to create resources, we use a file/files to mange infrastructure
- Resource: Any piece of infrastructure (Ex: Virtual machine, security group, network interface)'
- Provider - AWS, GCP, GitHub, Docker
- automates the creation of resources at the time of apply


Advantages of IAC:
- Easily repeatable
- easily readable
- operational certainty with "terraform plan"
- standardized environment builds
- quickly provisioned development environments
- disaster recovery

provider "aws" {
      access_key = "ACCESS_KEY",
      secret_key = "SECRET_KEY",
      region = "us-east-1"
}

# specific type of resource that we want to create, using ID - here example
resource "aws_instance" "example" {
       ami = "ami-8347957"
      instance_type = "t2.micro"
}



Terraform workflow to deploy:
1. scope - confirm what resources need to be created for a given project
2. author - create the configuration file in HCL based on the scoped parameters
3. initialize - run terraform init in the project directory with the configuration files, this will download the correct provider plug-ins for the project
4. plan and apply:
        - terraform plan -> to verify creation process
        - terraform apply -> to create real resources as well as state file that compares future changes in our configuration files to what actually exists in the development environment

create a terraform template (.tf)

provider "aws" {
 profile = "default"
 region = "us-east-1"
}

module "sns-topics" {
 source = "devops-workflow/sns-topics/aws"
 names = ["mt_teraform_example"]
 environment = "dev"

}


initialize the terraform template:

(base) HML-1612:terraform ladu$ terraform init


now apply the template

(base) HML-1612:terraform ladu$ terraform apply


now, to destroy the terraform template:


(base) HML-1612:terraform ladu$ terraform destroy

Tuesday, January 21, 2020

Explode function using PySpark

Sometimes, the data frame which we get by reading/parsing JSON, cannot be used as-is for our processing or analysis.

Explode function to the rescue.

When our df.printSchema( ) , returns as an array of structs, then using explode function is little tricky compared to using array of elements

Sample script which worked for me to solve the explode for array of structs:


"""python

from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.functions import explode

spark = SparkSession.builder.appName('test-explode').getOrCreate()

sqlContext = SQLContext(spark)
df = sqlContext.read.json("<json file name>")


exploded_df = df.select("id", explode("names")).select("id", "col.first_name", "col.middle_name", "col.last_name")

exploded_df.show()

"""


To filter out based on a condition:

male_names_list = exploded_df.filter(exploded_df.GENDER=='M').select("names").collect()
female_names_list = exploded_df.filter(exploded_df.GENDER == 'F').select("names").collect()


# to get names that are common in both males and females:
compare_names = return_matches(male_names_list, female_names_list)

# compare the matches in both the lists:

def return_matches(list1, list2):
    return list(set(list1) & set(list2))