Events
Data Mesh Manager provides an HTTP Feed endpoint to subscribe to events.
Events are published in the feed in near-realtime when significant events happen to any resource managed in the application, such as a data product or an access request. You can implement a component that runs in your data platform and trigger further actions, such as permission granting or revoking.
HTTP Feed
An HTTP feed provides an HTTP GET endpoint
that returns a chronological sequence (!) of events
serialized in CloudEvents format
in batched responses using the media type application/cloudevents-batch+json
.
It uses the lastEventId
query parameter to scroll through further items
and supports infinite polling for near-realtime feed subscriptions.
When implementing a consumer, you need to have a long-running process (e.g., in a Docker container)
and a persistent storage (e.g., a database or a file in an S3 bucket) to persist the lastEventId
.
SDK
You can use the Data Mesh Manager SDK to implement a custom event handler to trigger actions in your data platform.
Data Mesh Manager as State Repository
To persist the lastEventId
, you can use any persistent storage, or you can use Data Mesh Manager to store the lastEventId
.
Register an integration agent and store and retrieve the state using the API directly or use the DataMeshManagerStateRepositoryRemote
from the SDK.
Event Listener
An events feed client polls in an infinite loop to subscribe to a feed:
Pseudocode
endpoint = "https://api.datamesh-manager.com/api/events?longPolling=false"
lastEventId = read lastEventId from database or else null
while true:
try:
response = GET endpoint + "&lastEventId=" + lastEventId
for event in response:
process event
lastEventId = event.id
persist lastEventId
if events is empty:
wait 10 seconds
except:
// Delay the next request only in case of a server error
wait 60 seconds
Set the query parameter longPolling
to false
.
A client retrieves a batched response (max. 1000 events per response) and processes one event after another.
After processing one event, the client persists the event.id as lastEventId
and continues processing the events in this response.
When all entries are processed, the client immediately continues with the next request with the last persisted lastEventId
.
When the response is an empty array, wait some seconds until making the next call.
Event Listener with Long-Polling (deprecated)
This deprecated, please don't use it anymore.
Pseudocode
endpoint = "https://api.datamesh-manager.com/api/events?longPolling=true"
lastEventId = read lastEventId from database or else null
while true:
try:
response = GET endpoint + "&lastEventId=" + lastEventId
for event in response:
process event
lastEventId = event.id
persist lastEventId
except:
// Delay the next request only in case of a server error
wait 6- seconds
A client retrieves a batched response (max. 1000 events per response) and processes one event after another.
After processing one event, the client persists the event.id as lastEventId
and continues processing the events in this response.
When all entries are processed, the client immediately continues with the next request with the last persisted lastEventId
.
When the feed contains no newer events, the server waits some seconds until new data is retrieved, or it sends an empty array []
.
Then the client immediately continues with the next request with the last persisted lastEventId
.
In case of an error, the client needs to wait for 60 seconds, until the next request is sent.
Get events
This endpoint allows you to retrieve a batch of events, starting after the lastEventId
.
Optional attributes
- Name
lastEventId
- Type
- string
- Required
- Description
The
id
(from the CloudEvent envelope) of the last processed event.
Request
curl --request GET \
--url "https://api.datamesh-manager.com/api/events?lastEventId=1c82203f-f792-4fc3-94cb-59141f1d0793" \
--header "x-api-key: $DMM_API_KEY"
Response
[
{
"specversion": "1.0",
"id": "1ee0b7f3-e6a7-60e2-ae80-2788ab757ee0",
"type": "com.datamesh-manager.events.AccessRequestedEvent",
"source": "https://app.datamesh-manager.com",
"time": "2023-06-15T13:19:23.052693Z",
"subject": "6245dfda-1087-4cb4-9ebf-4b1cc393c3dd",
"datacontenttype": "application/json",
"dataschema": "https://app.datamesh-manager.com/api/openapi.yaml#/components/schemas/AccessRequestedEvent",
"data": {
"id": "6245dfda-1087-4cb4-9ebf-4b1cc393c3dd",
"timestamp": "2023-06-15T13:19:23.052636441Z"
}
},
{
"specversion": "1.0",
"id": "1ee0b7f3-e7c9-6953-ae80-2d4755b4bfa9",
"type": "com.datamesh-manager.events.AccessApprovedEvent",
"source": "https://app.datamesh-manager.com",
"time": "2023-06-15T13:19:23.216357Z",
"subject": "6245dfda-1087-4cb4-9ebf-4b1cc393c3dd",
"datacontenttype": "application/json",
"dataschema": "https://app.datamesh-manager.com/api/openapi.yaml#/components/schemas/AccessApprovedEvent",
"data": {
"id": "6245dfda-1087-4cb4-9ebf-4b1cc393c3dd",
"timestamp": "2023-06-15T13:19:23.216330881Z"
}
},
{
"specversion": "1.0",
"id": "1ee0b7f3-e7f0-6a54-ae80-033fc3b099d2",
"type": "com.datamesh-manager.events.AccessActivatedEvent",
"source": "https://app.datamesh-manager.com",
"time": "2023-06-15T13:19:23.232604Z",
"subject": "6245dfda-1087-4cb4-9ebf-4b1cc393c3dd",
"datacontenttype": "application/json",
"dataschema": "https://app.datamesh-manager.com/api/openapi.yaml#/components/schemas/AccessActivatedEvent",
"data": {
"id": "6245dfda-1087-4cb4-9ebf-4b1cc393c3dd",
"timestamp": "2023-06-15T13:19:23.232578049Z"
}
},
{
"specversion": "1.0",
"id": "1ee0ea8d-faaf-6e3f-be02-bd958ebb5a5f",
"type": "com.datamesh-manager.events.AccessDeactivatedEvent",
"source": "https://app.datamesh-manager.com",
"time": "2023-06-19T13:54:56.482679Z",
"subject": "6245dfda-1087-4cb4-9ebf-4b1cc393c3dd",
"datacontenttype": "application/json",
"dataschema": "https://app.datamesh-manager.com/api/openapi.yaml#/components/schemas/AccessDeactivatedEvent",
"data": {
"id": "6245dfda-1087-4cb4-9ebf-4b1cc393c3dd",
"timestamp": "2023-06-19T13:54:56.478285988Z"
}
}
]
Request
curl --request GET \
--url "https://api.datamesh-manager.com/api/events?lastEventId=1ee0ea8d-faaf-6e3f-be02-bd958ebb5a5f" \
--header "x-api-key: $DMM_API_KEY"
Response
[]
The client can filter for relevant event type
s.
And retrieve the current state of the relevant resource by its data.id
.