Wednesday, August 12, 2020

Connecting to Salesforce using Python [aiosfstream]

Connect to Salesforce Streaming Library using python to consume Salesforce Objects. 


Library used: aiosfstream

Ref link: https://aiosfstream.readthedocs.io/en/latest/quickstart.html#connecting

 

Quick start:

 

  1. Authentication:
    1. To connect to salesforce streaming API, all clients must authenticate themselves.
    2. supports various ways:

Username - Password authentication (using SalesforceStreamingClient)

 

client= SalesforceStreamingClient(

consumer_key="<consumer key>",

consumer_secret = "<consumer secret>",

username="<username>",

password = "<password>"

)

# client = Client(auth)

 

 

Refresh token authentication

 

auth = RefreshTokenAuthenticator(

consumer_key = "<consumer key>",

consumer_secret = "<consumer secret>",

refresh_token = "<refresh_token>"

)

client = Client(auth)

 

Authentication on sandbox args:

  • Remember to set sandbox=True
  • Make sure the username to be <username>.<sandbox_name>

 

client= SalesforceStreamingClient(

consumer_key="<consumer key>",

consumer_secret = "<consumer secret>",

username="<username>.<sandbox_name>",

password = "<password>",

sandbox=True

)

 

 

  1. Connecting:
    1. After creating Client object, call open() - to establish a connection with the server
    2. The connection is closed and session is terminated by calling close()

 

client= SalesforceStreamingClient(

consumer_key="<consumer key>",

consumer_secret = "<consumer secret>",

username="<username>",

password = "<password>"

)

 

await client.open()

 

# subscribe and receive messages

 

await client.close()

 

  1. Client objects can also be used as asynchronous context managers

 

aysnc with SalesforceStreamingClient(

onsumer_key="<consumer key>",

consumer_secret = "<consumer secret>",

username="<username>",

password = "<password>") as client:

# subscribe and receive messages

 

 

 

  1. Channels:
    1. String that looks like a URL path

ex: /topic/foo OR /topic/bar

 

 

  1. Subscriptions:
    1. To receive notification messages the client must subscribe to the channels it's interested in

await client.subscribe("/topic/foo")

  1. If we no longer want messages from the channel, we can unsubscribe

await client.unsubscribe("/topic/foo")

  1. The current set of subscriptions can be obtained from the

Client.subscriptions attribute

 

  1. Receiving messages:
    1. To receive messages broadcasted by Salesforce after subscribing to the channels, receive() method should be used.

message = await client.receive()

  1. this receive( ) will wait until a message is received, else throw TransportTimeoutError - when connection is lost with the server and client cannot re-establish the connection

ServerError - if connection gets closed by server

  1. Client can also be used as an asynchronous iterator in a for loop to wait for incoming messages.

async for message in client:

# process message.

 

 

 

  1. Replay of Events:
    1. Streaming- advantage - get notified of events as they occur. However, downside - client can get temporarily disconnected (Hardware/Software/Network failures). So, it might miss some of the messages emitted by the server.
    2. Salesforce event durability - 24 hours
      1. Salesforce stores events for 24 hours
      2. Events >24hour retention period are discarded
      3. Extends the event messages with:
        1. replayId
        2. createdDate

"These are called as ReplayMarker by aiosfstream"

used by client to request the missed event messages from the server when it reconnects

enables to process the event messages from where it left off

  1. Most convenient choice: Shelf object - which can store ReplayMarkers on the disk, between application restarts.

 

with shelve.open("replay.db") as replay:

aysnc with SalesforceStreamingClient(

consumer_key="<consumer key>",

consumer_secret = "<consumer secret>",

username="<username>",

password = "<password>",

replay=replay) as client:

 

await client.subscribe("/topic/foo")

 

async for message in client:

# process message

 


Tuesday, August 11, 2020

Copy data from S3 to Aurora Postgres

Scenario 1: To copy data from S3 to Aurora Postgres (greater than v9 or latest)

How?: We can use aws_s3.table_import_from_s3 function (to migrate the data from S3 to Aurora Postgres). 

Steps:

A sample file with columns - id, prefix, mstr_id is copied to S3. 

Create schema on Aurora Postgres (with the required columns). 

drop table core.mstr;

CREATE TABLE core.mstr (
	id varchar(300) NULL,
	prefix varchar(300) NULL,
	mstr_id float8 NULL
);

Copy command to transfer the data from S3 to Aurora Postgres

SELECT aws_s3.table_import_from_s3(
   'core.MSTR', 
   'id,prefix,mstr_id', 
   '(format csv, header true)',
   '<bucket-name>',
   'MSTR_DATA/part_file_00.csv',
   'us-east-2',
   '<secret key>', '<access key>'
);


Note: If IAM roles are given, we need not specify access keys. 

SELECT aws_s3.table_import_from_s3(
   'core.MSTR', 
   'id,prefix,mstr_id', 
   '(format csv, header true)',
   '<bucket-name>',
   'MSTR_DATA/part_file_00.csv',
   'us-east-2'
);


Note: aws_s3.table_import_from_s3 function is not supported for versions earlier than Aurora Postgres Version 9.


Scenario 2: To copy data from S3 to Aurora Postgres (older versions than 9)





How? : Use SQOOP Job to export the data from S3 to Aurora Postgres. 

Steps

Upload data files to s3

Create schema on Aurora Postgres (with the required columns). 

drop table core.mstr;

CREATE TABLE core.mstr (
	id varchar(300) NULL,
	prefix varchar(300) NULL,
	mstr_id float8 NULL
);

Spin-up EMR Cluster
  • Must have access to S3 and Aurora Postgres
  • Check for connectivity: 
    • Whether EMR cluster and RDS reside in same VPC
      • dig <Aurora hostname>
      • nc -vz <hostname> (must get a message: connectivity looks good)
  • Make sure the security groups are properly assigned to the EMR Cluster. 
    • The security group must ALLOW traffic from CORE node of EMR Cluster
    • Must be added to the RDS Security Group as a source

Sqoop Job

echo "Exporting MSTR table from S3 to Postgres"
echo "Sqoop Job in Progress"

sqoop export 
--connect <jdbc URL>
 --input-null-string NULL 
 --input-null-non-string NULL 
 --table mstr 
 --export-dir s3://<bucket_name>/MSTR/ 
 --username user_etl 
 --P
 -- --schema test
echo "Sqoop Job is executed successfully"
echo "Data will now be available on Aurora Postgres"