Events

Data Mesh Manager provides an HTTP Feed endpoint to subscribe to events.

Events are published in the feed in near-realtime when significant events happen to any resource managed in the application, such as a data product or an access request. You can implement a component that runs in your data platform and trigger further actions, such as permission granting or revoking.

HTTP Feed

An HTTP feed provides an HTTP GET endpoint that returns a chronological sequence (!) of events serialized in CloudEvents format in batched responses using the media type application/cloudevents-batch+json.

It uses the lastEventId query parameter to scroll through further items and supports infinite polling for near-realtime feed subscriptions.

When implementing a consumer, you need to have a long-running process (e.g., in a Docker container) and a persistent storage (e.g., a database or a file in an S3 bucket) to persist the lastEventId.

Read more

SDK

You can use the Data Mesh Manager SDK to implement a custom event handler to trigger actions in your data platform.

Data Mesh Manager as State Repository

To persist the lastEventId, you can use any persistent storage, or you can use Data Mesh Manager to store the lastEventId. Register a connector and store and retrieve the state using the API directly or use the DataMeshManagerStateRepositoryRemote from the SDK.

Event Listener

An events feed client polls in an infinite loop to subscribe to a feed:

Pseudocode

endpoint = "https://api.datamesh-manager.com/api/events?longPolling=false"
lastEventId = read lastEventId from database or else null

while true:
try:
  response = GET endpoint + "&lastEventId=" + lastEventId
  for event in response:
    process event
    lastEventId = event.id
    persist lastEventId
  if events is empty:
    wait 10 seconds
except:
  // Delay the next request only in case of a server error
  wait 60 seconds

Set the query parameter longPolling to false.

A client retrieves a batched response (max. 1000 events per response) and processes one event after another. After processing one event, the client persists the event.id as lastEventId and continues processing the events in this response. When all entries are processed, the client immediately continues with the next request with the last persisted lastEventId.

When the response is an empty array, wait some seconds until making the next call.

Event Listener with Long-Polling (deprecated)

This deprecated, please don't use it anymore.

Pseudocode

endpoint = "https://api.datamesh-manager.com/api/events?longPolling=true"
lastEventId = read lastEventId from database or else null

while true:
  try:
    response = GET endpoint + "&lastEventId=" + lastEventId
    for event in response:
      process event
      lastEventId = event.id
      persist lastEventId
  except:
    // Delay the next request only in case of a server error
    wait 6- seconds

A client retrieves a batched response (max. 1000 events per response) and processes one event after another. After processing one event, the client persists the event.id as lastEventId and continues processing the events in this response. When all entries are processed, the client immediately continues with the next request with the last persisted lastEventId. When the feed contains no newer events, the server waits some seconds until new data is retrieved, or it sends an empty array []. Then the client immediately continues with the next request with the last persisted lastEventId. In case of an error, the client needs to wait for 60 seconds, until the next request is sent.


GET/api/events

Get events

This endpoint allows you to retrieve a batch of events, starting after the lastEventId.

Optional attributes

  • Name
    lastEventId
    Type
    string
    Required
    Description

    The id (from the CloudEvent envelope) of the last processed event.

Request

GET
/api/events
curl --request GET \
--url "https://api.datamesh-manager.com/api/events?lastEventId=1c82203f-f792-4fc3-94cb-59141f1d0793" \
--header "x-api-key: $DMM_API_KEY"

Response

[
{
  "specversion": "1.0",
  "id": "1ee0b7f3-e6a7-60e2-ae80-2788ab757ee0",
  "type": "com.datamesh-manager.events.AccessRequestedEvent",
  "source": "https://app.datamesh-manager.com",
  "time": "2023-06-15T13:19:23.052693Z",
  "subject": "6245dfda-1087-4cb4-9ebf-4b1cc393c3dd",
  "datacontenttype": "application/json",
  "dataschema": "https://app.datamesh-manager.com/api/openapi.yaml#/components/schemas/AccessRequestedEvent",
  "data": {
    "id": "6245dfda-1087-4cb4-9ebf-4b1cc393c3dd",
    "timestamp": "2023-06-15T13:19:23.052636441Z"
  }
},
{
  "specversion": "1.0",
  "id": "1ee0b7f3-e7c9-6953-ae80-2d4755b4bfa9",
  "type": "com.datamesh-manager.events.AccessApprovedEvent",
  "source": "https://app.datamesh-manager.com",
  "time": "2023-06-15T13:19:23.216357Z",
  "subject": "6245dfda-1087-4cb4-9ebf-4b1cc393c3dd",
  "datacontenttype": "application/json",
  "dataschema": "https://app.datamesh-manager.com/api/openapi.yaml#/components/schemas/AccessApprovedEvent",
  "data": {
    "id": "6245dfda-1087-4cb4-9ebf-4b1cc393c3dd",
    "timestamp": "2023-06-15T13:19:23.216330881Z"
  }
},
{
  "specversion": "1.0",
  "id": "1ee0b7f3-e7f0-6a54-ae80-033fc3b099d2",
  "type": "com.datamesh-manager.events.AccessActivatedEvent",
  "source": "https://app.datamesh-manager.com",
  "time": "2023-06-15T13:19:23.232604Z",
  "subject": "6245dfda-1087-4cb4-9ebf-4b1cc393c3dd",
  "datacontenttype": "application/json",
  "dataschema": "https://app.datamesh-manager.com/api/openapi.yaml#/components/schemas/AccessActivatedEvent",
  "data": {
    "id": "6245dfda-1087-4cb4-9ebf-4b1cc393c3dd",
    "timestamp": "2023-06-15T13:19:23.232578049Z"
  }
},
{
  "specversion": "1.0",
  "id": "1ee0ea8d-faaf-6e3f-be02-bd958ebb5a5f",
  "type": "com.datamesh-manager.events.AccessDeactivatedEvent",
  "source": "https://app.datamesh-manager.com",
  "time": "2023-06-19T13:54:56.482679Z",
  "subject": "6245dfda-1087-4cb4-9ebf-4b1cc393c3dd",
  "datacontenttype": "application/json",
  "dataschema": "https://app.datamesh-manager.com/api/openapi.yaml#/components/schemas/AccessDeactivatedEvent",
  "data": {
    "id": "6245dfda-1087-4cb4-9ebf-4b1cc393c3dd",
    "timestamp": "2023-06-19T13:54:56.478285988Z"
  }
}
]

Request

GET
/api/events
curl --request GET \
--url "https://api.datamesh-manager.com/api/events?lastEventId=1ee0ea8d-faaf-6e3f-be02-bd958ebb5a5f" \
--header "x-api-key: $DMM_API_KEY"

Response

[]

The client can filter for relevant event types. And retrieve the current state of the relevant resource by its data.id.