1
0
mirror of synced 2025-12-23 21:03:15 -05:00

fixed bug which crashes okta log incremental sync (#7584)

* fixed bug which crashes okta log incremental sync

* bump connector version

* revert to pendulum
This commit is contained in:
Collin Scangarella
2021-11-02 20:44:10 -07:00
committed by GitHub
parent 73b8589a2b
commit c9c38e404a
4 changed files with 37 additions and 4 deletions

View File

@@ -2,6 +2,6 @@
"sourceDefinitionId": "1d4fdb25-64fc-4569-92da-fcdca79a8372",
"name": "Okta",
"dockerRepository": "airbyte/source-okta",
"dockerImageTag": "0.1.2",
"dockerImageTag": "0.1.3",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/okta"
}

View File

@@ -349,7 +349,7 @@
- name: Okta
sourceDefinitionId: 1d4fdb25-64fc-4569-92da-fcdca79a8372
dockerRepository: airbyte/source-okta
dockerImageTag: 0.1.2
dockerImageTag: 0.1.3
documentationUrl: https://docs.airbyte.io/integrations/sources/okta
sourceType: api
- name: OneSignal

View File

@@ -21,6 +21,17 @@
"destination_sync_mode": "overwrite",
"cursor_field": ["lastUpdated"],
"primary_key": [["id"]]
},
{
"stream": {
"name": "logs",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["published"],
"primary_key": [["uuid"]]
}
]
}

View File

@@ -4,6 +4,7 @@
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
from urllib import parse
@@ -90,9 +91,14 @@ class IncrementalOktaStream(OktaStream, ABC):
)
}
def request_params(self, stream_state=None, **kwargs):
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
stream_state = stream_state or {}
params = super().request_params(stream_state=stream_state, **kwargs)
params = super().request_params(stream_state, stream_slice, next_page_token)
latest_entry = stream_state.get(self.cursor_field)
if latest_entry:
params["filter"] = f'{self.cursor_field} gt "{latest_entry}"'
@@ -114,6 +120,22 @@ class Logs(IncrementalOktaStream):
def path(self, **kwargs) -> str:
return "logs"
def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
stream_state = stream_state or {}
params = {
"limit": self.page_size,
**(next_page_token or {}),
}
latest_entry = stream_state.get(self.cursor_field)
if latest_entry:
params["since"] = latest_entry
return params
class Users(IncrementalOktaStream):
cursor_field = "lastUpdated"