add final for params, local variables, and fields (#7084)
This commit is contained in:
@@ -32,7 +32,7 @@ public class KafkaSource extends BaseConnector implements Source {
|
||||
public KafkaSource() {}
|
||||
|
||||
@Override
|
||||
public AirbyteConnectionStatus check(JsonNode config) {
|
||||
public AirbyteConnectionStatus check(final JsonNode config) {
|
||||
try {
|
||||
final String testTopic = config.has("test_topic") ? config.get("test_topic").asText() : "";
|
||||
if (!testTopic.isBlank()) {
|
||||
@@ -44,7 +44,7 @@ public class KafkaSource extends BaseConnector implements Source {
|
||||
LOGGER.info("Successfully connected to Kafka brokers for topic '{}'.", config.get("test_topic").asText());
|
||||
}
|
||||
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
|
||||
} catch (Exception e) {
|
||||
} catch (final Exception e) {
|
||||
LOGGER.error("Exception attempting to connect to the Kafka brokers: ", e);
|
||||
return new AirbyteConnectionStatus()
|
||||
.withStatus(Status.FAILED)
|
||||
@@ -53,10 +53,10 @@ public class KafkaSource extends BaseConnector implements Source {
|
||||
}
|
||||
|
||||
@Override
|
||||
public AirbyteCatalog discover(JsonNode config) throws Exception {
|
||||
public AirbyteCatalog discover(final JsonNode config) throws Exception {
|
||||
|
||||
Set<String> topicsToSubscribe = KafkaSourceConfig.getKafkaSourceConfig(config).getTopicsToSubscribe();
|
||||
List<AirbyteStream> streams = topicsToSubscribe.stream().map(topic -> CatalogHelpers
|
||||
final Set<String> topicsToSubscribe = KafkaSourceConfig.getKafkaSourceConfig(config).getTopicsToSubscribe();
|
||||
final List<AirbyteStream> streams = topicsToSubscribe.stream().map(topic -> CatalogHelpers
|
||||
.createAirbyteStream(topic, Field.of("value", JsonSchemaPrimitive.STRING))
|
||||
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))
|
||||
.collect(Collectors.toList());
|
||||
@@ -64,7 +64,8 @@ public class KafkaSource extends BaseConnector implements Source {
|
||||
}
|
||||
|
||||
@Override
|
||||
public AutoCloseableIterator<AirbyteMessage> read(JsonNode config, ConfiguredAirbyteCatalog catalog, JsonNode state) throws Exception {
|
||||
public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final JsonNode state)
|
||||
throws Exception {
|
||||
final AirbyteConnectionStatus check = check(config);
|
||||
if (check.getStatus().equals(AirbyteConnectionStatus.Status.FAILED)) {
|
||||
throw new RuntimeException("Unable establish a connection: " + check.getMessage());
|
||||
@@ -72,9 +73,9 @@ public class KafkaSource extends BaseConnector implements Source {
|
||||
|
||||
final KafkaSourceConfig kafkaSourceConfig = KafkaSourceConfig.getKafkaSourceConfig(config);
|
||||
final KafkaConsumer<String, JsonNode> consumer = kafkaSourceConfig.getConsumer();
|
||||
List<ConsumerRecord<String, JsonNode>> recordsList = new ArrayList<>();
|
||||
final List<ConsumerRecord<String, JsonNode>> recordsList = new ArrayList<>();
|
||||
|
||||
int retry = config.has("repeated_calls") ? config.get("repeated_calls").intValue() : 0;
|
||||
final int retry = config.has("repeated_calls") ? config.get("repeated_calls").intValue() : 0;
|
||||
int pollCount = 0;
|
||||
while (true) {
|
||||
final ConsumerRecords<String, JsonNode> consumerRecords = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
|
||||
@@ -93,14 +94,14 @@ public class KafkaSource extends BaseConnector implements Source {
|
||||
consumer.commitAsync();
|
||||
}
|
||||
consumer.close();
|
||||
Iterator<ConsumerRecord<String, JsonNode>> iterator = recordsList.iterator();
|
||||
final Iterator<ConsumerRecord<String, JsonNode>> iterator = recordsList.iterator();
|
||||
|
||||
return AutoCloseableIterators.fromIterator(new AbstractIterator<>() {
|
||||
|
||||
@Override
|
||||
protected AirbyteMessage computeNext() {
|
||||
if (iterator.hasNext()) {
|
||||
ConsumerRecord<String, JsonNode> record = iterator.next();
|
||||
final ConsumerRecord<String, JsonNode> record = iterator.next();
|
||||
return new AirbyteMessage()
|
||||
.withType(AirbyteMessage.Type.RECORD)
|
||||
.withRecord(new AirbyteRecordMessage()
|
||||
@@ -115,7 +116,7 @@ public class KafkaSource extends BaseConnector implements Source {
|
||||
});
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
public static void main(final String[] args) throws Exception {
|
||||
final Source source = new KafkaSource();
|
||||
LOGGER.info("Starting source: {}", KafkaSource.class);
|
||||
new IntegrationRunner(source).run(args);
|
||||
|
||||
@@ -33,18 +33,18 @@ public class KafkaSourceConfig {
|
||||
private KafkaConsumer<String, JsonNode> consumer;
|
||||
private Set<String> topicsToSubscribe;
|
||||
|
||||
private KafkaSourceConfig(JsonNode config) {
|
||||
private KafkaSourceConfig(final JsonNode config) {
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
public static KafkaSourceConfig getKafkaSourceConfig(JsonNode config) {
|
||||
public static KafkaSourceConfig getKafkaSourceConfig(final JsonNode config) {
|
||||
if (instance == null) {
|
||||
instance = new KafkaSourceConfig(config);
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
||||
private KafkaConsumer<String, JsonNode> buildKafkaConsumer(JsonNode config) {
|
||||
private KafkaConsumer<String, JsonNode> buildKafkaConsumer(final JsonNode config) {
|
||||
final Map<String, Object> props = new HashMap<>();
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.get("bootstrap_servers").asText());
|
||||
props.put(ConsumerConfig.GROUP_ID_CONFIG,
|
||||
@@ -76,8 +76,8 @@ public class KafkaSourceConfig {
|
||||
return new KafkaConsumer<>(filteredProps);
|
||||
}
|
||||
|
||||
private Map<String, Object> propertiesByProtocol(JsonNode config) {
|
||||
JsonNode protocolConfig = config.get("protocol");
|
||||
private Map<String, Object> propertiesByProtocol(final JsonNode config) {
|
||||
final JsonNode protocolConfig = config.get("protocol");
|
||||
LOGGER.info("Kafka protocol config: {}", protocolConfig.toString());
|
||||
final KafkaProtocol protocol = KafkaProtocol.valueOf(protocolConfig.get("security_protocol").asText().toUpperCase());
|
||||
final ImmutableMap.Builder<String, Object> builder = ImmutableMap.<String, Object>builder()
|
||||
@@ -101,11 +101,11 @@ public class KafkaSourceConfig {
|
||||
}
|
||||
consumer = buildKafkaConsumer(config);
|
||||
|
||||
JsonNode subscription = config.get("subscription");
|
||||
final JsonNode subscription = config.get("subscription");
|
||||
LOGGER.info("Kafka subscribe method: {}", subscription.toString());
|
||||
switch (subscription.get("subscription_type").asText()) {
|
||||
case "subscribe" -> {
|
||||
String topicPattern = subscription.get("topic_pattern").asText();
|
||||
final String topicPattern = subscription.get("topic_pattern").asText();
|
||||
consumer.subscribe(Pattern.compile(topicPattern));
|
||||
topicsToSubscribe = consumer.listTopics().keySet().stream()
|
||||
.filter(topic -> topic.matches(topicPattern))
|
||||
@@ -113,10 +113,10 @@ public class KafkaSourceConfig {
|
||||
}
|
||||
case "assign" -> {
|
||||
topicsToSubscribe = new HashSet<>();
|
||||
String topicPartitions = subscription.get("topic_partitions").asText();
|
||||
String[] topicPartitionsStr = topicPartitions.replaceAll("\\s+", "").split(",");
|
||||
List<TopicPartition> topicPartitionList = Arrays.stream(topicPartitionsStr).map(topicPartition -> {
|
||||
String[] pair = topicPartition.split(":");
|
||||
final String topicPartitions = subscription.get("topic_partitions").asText();
|
||||
final String[] topicPartitionsStr = topicPartitions.replaceAll("\\s+", "").split(",");
|
||||
final List<TopicPartition> topicPartitionList = Arrays.stream(topicPartitionsStr).map(topicPartition -> {
|
||||
final String[] pair = topicPartition.split(":");
|
||||
topicsToSubscribe.add(pair[0]);
|
||||
return new TopicPartition(pair[0], Integer.parseInt(pair[1]));
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
@@ -50,8 +50,8 @@ public class KafkaSourceAcceptanceTest extends SourceAcceptanceTest {
|
||||
|
||||
@Override
|
||||
protected JsonNode getConfig() {
|
||||
ObjectNode protocolConfig = mapper.createObjectNode();
|
||||
ObjectNode subscriptionConfig = mapper.createObjectNode();
|
||||
final ObjectNode protocolConfig = mapper.createObjectNode();
|
||||
final ObjectNode subscriptionConfig = mapper.createObjectNode();
|
||||
protocolConfig.put("security_protocol", KafkaProtocol.PLAINTEXT.toString());
|
||||
subscriptionConfig.put("subscription_type", "subscribe");
|
||||
subscriptionConfig.put("topic_pattern", TOPIC_NAME);
|
||||
@@ -69,7 +69,7 @@ public class KafkaSourceAcceptanceTest extends SourceAcceptanceTest {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setupEnvironment(TestDestinationEnv environment) throws Exception {
|
||||
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
|
||||
KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.0"));
|
||||
KAFKA.start();
|
||||
|
||||
@@ -83,9 +83,9 @@ public class KafkaSourceAcceptanceTest extends SourceAcceptanceTest {
|
||||
.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
|
||||
.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName())
|
||||
.build();
|
||||
KafkaProducer<String, JsonNode> producer = new KafkaProducer<>(props);
|
||||
final KafkaProducer<String, JsonNode> producer = new KafkaProducer<>(props);
|
||||
|
||||
ObjectNode event = mapper.createObjectNode();
|
||||
final ObjectNode event = mapper.createObjectNode();
|
||||
event.put("test", "value");
|
||||
|
||||
producer.send(new ProducerRecord<>(TOPIC_NAME, event), (recordMetadata, exception) -> {
|
||||
@@ -96,14 +96,14 @@ public class KafkaSourceAcceptanceTest extends SourceAcceptanceTest {
|
||||
}
|
||||
|
||||
private void createTopic() throws Exception {
|
||||
try (var admin = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()))) {
|
||||
NewTopic topic = new NewTopic(TOPIC_NAME, 1, (short) 1);
|
||||
try (final var admin = AdminClient.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()))) {
|
||||
final NewTopic topic = new NewTopic(TOPIC_NAME, 1, (short) 1);
|
||||
admin.createTopics(Collections.singletonList(topic)).all().get();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void tearDown(TestDestinationEnv testEnv) {
|
||||
protected void tearDown(final TestDestinationEnv testEnv) {
|
||||
KAFKA.close();
|
||||
}
|
||||
|
||||
@@ -114,7 +114,8 @@ public class KafkaSourceAcceptanceTest extends SourceAcceptanceTest {
|
||||
|
||||
@Override
|
||||
protected ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception {
|
||||
ConfiguredAirbyteStream streams = CatalogHelpers.createConfiguredAirbyteStream(TOPIC_NAME, null, Field.of("value", JsonSchemaPrimitive.STRING));
|
||||
final ConfiguredAirbyteStream streams =
|
||||
CatalogHelpers.createConfiguredAirbyteStream(TOPIC_NAME, null, Field.of("value", JsonSchemaPrimitive.STRING));
|
||||
streams.setSyncMode(SyncMode.FULL_REFRESH);
|
||||
return new ConfiguredAirbyteCatalog().withStreams(Collections.singletonList(streams));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user