1
0
mirror of synced 2026-01-10 09:04:48 -05:00

Include jobId in webhook messages (#15704)

This commit is contained in:
akawalsky
2022-08-16 15:09:10 -07:00
committed by GitHub
parent 83dd2c3de2
commit f0a69c40f8
8 changed files with 27 additions and 18 deletions

View File

@@ -63,13 +63,13 @@ public class CustomerioNotificationClient extends NotificationClient {
}
@Override
public boolean notifyJobFailure(final String sourceConnector, final String destinationConnector, final String jobDescription, final String logUrl)
public boolean notifyJobFailure(final String sourceConnector, final String destinationConnector, final String jobDescription, final String logUrl, final Long jobId)
throws IOException, InterruptedException {
throw new NotImplementedException();
}
@Override
public boolean notifyJobSuccess(final String sourceConnector, final String destinationConnector, final String jobDescription, final String logUrl)
public boolean notifyJobSuccess(final String sourceConnector, final String destinationConnector, final String jobDescription, final String logUrl, final Long jobId)
throws IOException, InterruptedException {
throw new NotImplementedException();
}

View File

@@ -23,14 +23,16 @@ public abstract class NotificationClient {
String sourceConnector,
String destinationConnector,
String jobDescription,
String logUrl)
String logUrl,
Long jobId)
throws IOException, InterruptedException;
public abstract boolean notifyJobSuccess(
String sourceConnector,
String destinationConnector,
String jobDescription,
String logUrl)
String logUrl,
Long jobId)
throws IOException, InterruptedException;
public abstract boolean notifyConnectionDisabled(String receiverEmail,

View File

@@ -44,25 +44,27 @@ public class SlackNotificationClient extends NotificationClient {
}
@Override
public boolean notifyJobFailure(final String sourceConnector, final String destinationConnector, final String jobDescription, final String logUrl)
public boolean notifyJobFailure(final String sourceConnector, final String destinationConnector, final String jobDescription, final String logUrl, final Long jobId)
throws IOException, InterruptedException {
return notifyFailure(renderTemplate(
"slack/failure_slack_notification_template.txt",
sourceConnector,
destinationConnector,
jobDescription,
logUrl));
logUrl,
String.valueOf(jobId)));
}
@Override
public boolean notifyJobSuccess(final String sourceConnector, final String destinationConnector, final String jobDescription, final String logUrl)
public boolean notifyJobSuccess(final String sourceConnector, final String destinationConnector, final String jobDescription, final String logUrl, final Long jobId)
throws IOException, InterruptedException {
return notifySuccess(renderTemplate(
"slack/success_slack_notification_template.txt",
sourceConnector,
destinationConnector,
jobDescription,
logUrl));
logUrl,
String.valueOf(jobId)));
}
@Override

View File

@@ -2,3 +2,5 @@ Your connection from %s to %s just failed...
This happened with %s
You can access its logs here: %s
Job ID: %s

View File

@@ -2,3 +2,5 @@ Your connection from %s to %s succeeded
This was for %s
You can access its logs here: %s
Job ID: %s

View File

@@ -40,6 +40,7 @@ class SlackNotificationClientTest {
private static final String JOB_DESCRIPTION = "job description";
private static final String LOG_URL = "logUrl";
private static final String SOURCE_TEST = "source-test";
private static final Long JOB_ID = 1L;
public static final String WEBHOOK_URL = "http://localhost:";
private static final String EXPECTED_FAIL_MESSAGE = "Your connection from source-test to destination-test just failed...\n"
@@ -81,7 +82,7 @@ class SlackNotificationClientTest {
new SlackNotificationClient(new Notification()
.withNotificationType(NotificationType.SLACK)
.withSlackConfiguration(new SlackNotificationConfiguration().withWebhook(WEBHOOK_URL + server.getAddress().getPort() + "/bad")));
assertThrows(IOException.class, () -> client.notifyJobFailure(SOURCE_TEST, DESTINATION_TEST, JOB_DESCRIPTION, LOG_URL));
assertThrows(IOException.class, () -> client.notifyJobFailure(SOURCE_TEST, DESTINATION_TEST, JOB_DESCRIPTION, LOG_URL, JOB_ID));
}
@Test
@@ -89,7 +90,7 @@ class SlackNotificationClientTest {
final SlackNotificationClient client =
new SlackNotificationClient(
new Notification().withNotificationType(NotificationType.SLACK).withSlackConfiguration(new SlackNotificationConfiguration()));
assertFalse(client.notifyJobFailure(SOURCE_TEST, DESTINATION_TEST, JOB_DESCRIPTION, LOG_URL));
assertFalse(client.notifyJobFailure(SOURCE_TEST, DESTINATION_TEST, JOB_DESCRIPTION, LOG_URL, JOB_ID));
}
@Test
@@ -111,7 +112,7 @@ class SlackNotificationClientTest {
new SlackNotificationClient(new Notification()
.withNotificationType(NotificationType.SLACK)
.withSlackConfiguration(new SlackNotificationConfiguration().withWebhook(WEBHOOK_URL + server.getAddress().getPort() + TEST_PATH)));
assertTrue(client.notifyJobFailure(SOURCE_TEST, DESTINATION_TEST, JOB_DESCRIPTION, LOG_URL));
assertTrue(client.notifyJobFailure(SOURCE_TEST, DESTINATION_TEST, JOB_DESCRIPTION, LOG_URL, JOB_ID));
}
@Test
@@ -122,7 +123,7 @@ class SlackNotificationClientTest {
.withNotificationType(NotificationType.SLACK)
.withSendOnSuccess(true)
.withSlackConfiguration(new SlackNotificationConfiguration().withWebhook(WEBHOOK_URL + server.getAddress().getPort() + TEST_PATH)));
assertTrue(client.notifyJobSuccess(SOURCE_TEST, DESTINATION_TEST, JOB_DESCRIPTION, LOG_URL));
assertTrue(client.notifyJobSuccess(SOURCE_TEST, DESTINATION_TEST, JOB_DESCRIPTION, LOG_URL, JOB_ID));
}
@Test

View File

@@ -103,12 +103,12 @@ public class JobNotifier {
MoreMaps.merge(jobMetadata, sourceMetadata, destinationMetadata, notificationMetadata.build()));
if (FAILURE_NOTIFICATION == action) {
if (!notificationClient.notifyJobFailure(sourceConnector, destinationConnector, jobDescription, logUrl)) {
if (!notificationClient.notifyJobFailure(sourceConnector, destinationConnector, jobDescription, logUrl, job.getId())) {
LOGGER.warn("Failed to successfully notify failure: {}", notification);
}
break;
} else if (SUCCESS_NOTIFICATION == action) {
if (!notificationClient.notifyJobSuccess(sourceConnector, destinationConnector, jobDescription, logUrl)) {
if (!notificationClient.notifyJobSuccess(sourceConnector, destinationConnector, jobDescription, logUrl, job.getId())) {
LOGGER.warn("Failed to successfully notify success: {}", notification);
}
break;

View File

@@ -4,8 +4,7 @@
package io.airbyte.scheduler.persistence;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
@@ -85,7 +84,7 @@ class JobNotifierTest {
when(configRepository.getStandardDestinationDefinition(any())).thenReturn(destinationDefinition);
when(configRepository.getStandardWorkspace(WORKSPACE_ID, true)).thenReturn(getWorkspace());
when(workspaceHelper.getWorkspaceForJobIdIgnoreExceptions(job.getId())).thenReturn(WORKSPACE_ID);
when(notificationClient.notifyJobFailure(anyString(), anyString(), anyString(), anyString())).thenReturn(true);
when(notificationClient.notifyJobFailure(anyString(), anyString(), anyString(), anyString(), anyLong())).thenReturn(true);
jobNotifier.failJob("JobNotifierTest was running", job);
final DateTimeFormatter formatter = DateTimeFormatter.ofLocalizedDateTime(FormatStyle.FULL).withZone(ZoneId.systemDefault());
@@ -94,7 +93,8 @@ class JobNotifierTest {
"destination-test",
String.format("sync started on %s, running for 1 day 10 hours 17 minutes 36 seconds, as the JobNotifierTest was running.",
formatter.format(Instant.ofEpochSecond(job.getStartedAtInSecond().get()))),
String.format("http://localhost:8000/workspaces/%s/connections/%s", WORKSPACE_ID, job.getScope()));
String.format("http://localhost:8000/workspaces/%s/connections/%s", WORKSPACE_ID, job.getScope()),
job.getId());
final Builder<String, Object> metadata = ImmutableMap.builder();
metadata.put("connection_id", UUID.fromString(job.getScope()));