Connect to Salesforce Streaming Library using python to consume Salesforce Objects.
Library used: aiosfstream
Ref link: https://aiosfstream.readthedocs.io/en/latest/quickstart.html#connecting
Quick start:
- Authentication:
- To connect to salesforce streaming API, all clients must authenticate themselves.
- supports various ways:
Username - Password authentication (using SalesforceStreamingClient)
client= SalesforceStreamingClient(
consumer_key="<consumer key>",
consumer_secret = "<consumer secret>",
username="<username>",
password = "<password>"
)
# client = Client(auth)
Refresh token authentication
auth = RefreshTokenAuthenticator(
consumer_key = "<consumer key>",
consumer_secret = "<consumer secret>",
refresh_token = "<refresh_token>"
)
client = Client(auth)
Authentication on sandbox args:
- Remember to set sandbox=True
- Make sure the username to be <username>.<sandbox_name>
client= SalesforceStreamingClient(
consumer_key="<consumer key>",
consumer_secret = "<consumer secret>",
username="<username>.<sandbox_name>",
password = "<password>",
sandbox=True
)
- Connecting:
- After creating Client object, call open() - to establish a connection with the server
- The connection is closed and session is terminated by calling close()
client= SalesforceStreamingClient(
consumer_key="<consumer key>",
consumer_secret = "<consumer secret>",
username="<username>",
password = "<password>"
)
await client.open()
# subscribe and receive messages
await client.close()
- Client objects can also be used as asynchronous context managers
aysnc with SalesforceStreamingClient(
onsumer_key="<consumer key>",
consumer_secret = "<consumer secret>",
username="<username>",
password = "<password>") as client:
# subscribe and receive messages
- Channels:
- String that looks like a URL path
ex: /topic/foo OR /topic/bar
- Subscriptions:
- To receive notification messages the client must subscribe to the channels it's interested in
await client.subscribe("/topic/foo")
- If we no longer want messages from the channel, we can unsubscribe
await client.unsubscribe("/topic/foo")
- The current set of subscriptions can be obtained from the
Client.subscriptions attribute
- Receiving messages:
- To receive messages broadcasted by Salesforce after subscribing to the channels, receive() method should be used.
message = await client.receive()
- this receive( ) will wait until a message is received, else throw TransportTimeoutError - when connection is lost with the server and client cannot re-establish the connection
ServerError - if connection gets closed by server
- Client can also be used as an asynchronous iterator in a for loop to wait for incoming messages.
async for message in client:
# process message.
- Replay of Events:
- Streaming- advantage - get notified of events as they occur. However, downside - client can get temporarily disconnected (Hardware/Software/Network failures). So, it might miss some of the messages emitted by the server.
- Salesforce event durability - 24 hours
- Salesforce stores events for 24 hours
- Events >24hour retention period are discarded
- Extends the event messages with:
- replayId
- createdDate
"These are called as ReplayMarker by aiosfstream"
used by client to request the missed event messages from the server when it reconnects
enables to process the event messages from where it left off
- Most convenient choice: Shelf object - which can store ReplayMarkers on the disk, between application restarts.
with shelve.open("replay.db") as replay:
aysnc with SalesforceStreamingClient(
consumer_key="<consumer key>",
consumer_secret = "<consumer secret>",
username="<username>",
password = "<password>",
replay=replay) as client:
await client.subscribe("/topic/foo")
async for message in client:
# process message
No comments:
Post a Comment