Skip to main content

Connecting to Salesforce using Python [aiosfstream]

Connect to Salesforce Streaming Library using python to consume Salesforce Objects. 


Library used: aiosfstream

Ref link: https://aiosfstream.readthedocs.io/en/latest/quickstart.html#connecting

 

Quick start:

 

  1. Authentication:
    1. To connect to salesforce streaming API, all clients must authenticate themselves.
    2. supports various ways:

Username - Password authentication (using SalesforceStreamingClient)

 

client= SalesforceStreamingClient(

consumer_key="<consumer key>",

consumer_secret = "<consumer secret>",

username="<username>",

password = "<password>"

)

# client = Client(auth)

 

 

Refresh token authentication

 

auth = RefreshTokenAuthenticator(

consumer_key = "<consumer key>",

consumer_secret = "<consumer secret>",

refresh_token = "<refresh_token>"

)

client = Client(auth)

 

Authentication on sandbox args:

  • Remember to set sandbox=True
  • Make sure the username to be <username>.<sandbox_name>

 

client= SalesforceStreamingClient(

consumer_key="<consumer key>",

consumer_secret = "<consumer secret>",

username="<username>.<sandbox_name>",

password = "<password>",

sandbox=True

)

 

 

  1. Connecting:
    1. After creating Client object, call open() - to establish a connection with the server
    2. The connection is closed and session is terminated by calling close()

 

client= SalesforceStreamingClient(

consumer_key="<consumer key>",

consumer_secret = "<consumer secret>",

username="<username>",

password = "<password>"

)

 

await client.open()

 

# subscribe and receive messages

 

await client.close()

 

  1. Client objects can also be used as asynchronous context managers

 

aysnc with SalesforceStreamingClient(

onsumer_key="<consumer key>",

consumer_secret = "<consumer secret>",

username="<username>",

password = "<password>") as client:

# subscribe and receive messages

 

 

 

  1. Channels:
    1. String that looks like a URL path

ex: /topic/foo OR /topic/bar

 

 

  1. Subscriptions:
    1. To receive notification messages the client must subscribe to the channels it's interested in

await client.subscribe("/topic/foo")

  1. If we no longer want messages from the channel, we can unsubscribe

await client.unsubscribe("/topic/foo")

  1. The current set of subscriptions can be obtained from the

Client.subscriptions attribute

 

  1. Receiving messages:
    1. To receive messages broadcasted by Salesforce after subscribing to the channels, receive() method should be used.

message = await client.receive()

  1. this receive( ) will wait until a message is received, else throw TransportTimeoutError - when connection is lost with the server and client cannot re-establish the connection

ServerError - if connection gets closed by server

  1. Client can also be used as an asynchronous iterator in a for loop to wait for incoming messages.

async for message in client:

# process message.

 

 

 

  1. Replay of Events:
    1. Streaming- advantage - get notified of events as they occur. However, downside - client can get temporarily disconnected (Hardware/Software/Network failures). So, it might miss some of the messages emitted by the server.
    2. Salesforce event durability - 24 hours
      1. Salesforce stores events for 24 hours
      2. Events >24hour retention period are discarded
      3. Extends the event messages with:
        1. replayId
        2. createdDate

"These are called as ReplayMarker by aiosfstream"

used by client to request the missed event messages from the server when it reconnects

enables to process the event messages from where it left off

  1. Most convenient choice: Shelf object - which can store ReplayMarkers on the disk, between application restarts.

 

with shelve.open("replay.db") as replay:

aysnc with SalesforceStreamingClient(

consumer_key="<consumer key>",

consumer_secret = "<consumer secret>",

username="<username>",

password = "<password>",

replay=replay) as client:

 

await client.subscribe("/topic/foo")

 

async for message in client:

# process message

 


Comments

Popular posts from this blog

AWS Connect: Reporting and Visualizations

Amazon connect offers: - built in reports i.e., historical and real-time reports.  We can customize these reports, schedule them and can integrate with any BI tool of our requirement to query and view the connect data.  Sample solution provided by AWS: 1. Make sure Connect is exporting the CTR data using Kinesis Data Stream 2. Use Kinesis Firehose to deliver the CTR that are in KDS to S3. (CTR's can be delivered as batch of records, so one s3 object might have multiple CTR's). AWS Lambda is used to add a new line character to each record, which makes object easier to parse.  3. s3 Event Notifications are used to send an event to modify the CTR record and saves it in S3. 4. Athena queries the modified CTR's using SQL. Use partitions to restrict the amount of data scanned by each query, improving performance and reducing cost. Lambda function is used to maintain the partitions.  5. Quicksight is used to visualize the modified CTRs.  Solution variations: Convert re...

Databricks: Job aborted due to stage failure. Total size of serialized results is bigger that spark driver memory.

  While running a databricks job, especially running a job with large datasets and longer running queries that creates a lot of temp space - we might be facing below issue if we have a minimal configuration set to the cluster.  The simple way to fix this would be changing the spark driver config in the databricks cluster tab spark.driver.maxResultSize = 100G (change the GB based on your cluster size)

Terraform lifecycle

 If we are using terraform, terraform state file is the heart of all the infrastructure that we spin up using terraform templates.  There are several ways to deploy the infrastructure using terraform: 1. Using CLI (setup terraform and then run terraform commands) 2. Automated Build (terraform scripts integrated as part of your jenkins pipeline) No matter of the way we chose, we must make sure that we are using the same terraform state file, so that we are having a sync and proper checklists of the resources that we used.  I would like to share the terraform commands that we do on a daily basis: terraform init = the basic/starting command which initializes the terraform (make sure the proper provider is provided. In my case, I use AWS).  terraform workspace select <workspace name > (creates a new workspace, useful in scenarios where we have different terraform modules - database, servers, logs, storage) terraform state list = shows the list of terraform resour...