mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-19 18:05:41 -05:00
fix(core): CrudEvent should not be done on the repository side for KV
This commit is contained in:
committed by
brian-mulier-p
parent
1c097209ac
commit
07e90de835
@@ -63,7 +63,7 @@ public class MetadataMigrationCommandTest {
|
||||
|
||||
|
||||
assertThat(out.toString()).contains("✅ Metadata migration complete.");
|
||||
// Still it's not in the metadata repository because no flow exist to find that secret
|
||||
// Still it's not in the metadata repository because no flow exist to find that kv
|
||||
assertThat(kvMetadataRepository.findByName(tenantId, namespace, key).isPresent()).isFalse();
|
||||
assertThat(kvMetadataRepository.findByName(tenantId, anotherNamespace, anotherKey).isPresent()).isFalse();
|
||||
|
||||
@@ -79,9 +79,9 @@ public class MetadataMigrationCommandTest {
|
||||
PicocliRunner.call(App.class, ctx, kvMetadataMigrationCommand);
|
||||
|
||||
assertThat(out.toString()).contains("✅ Metadata migration complete.");
|
||||
Optional<PersistedKvMetadata> foundSecret = kvMetadataRepository.findByName(tenantId, namespace, key);
|
||||
assertThat(foundSecret.isPresent()).isTrue();
|
||||
assertThat(foundSecret.get().getDescription()).isEqualTo(description);
|
||||
Optional<PersistedKvMetadata> foundKv = kvMetadataRepository.findByName(tenantId, namespace, key);
|
||||
assertThat(foundKv.isPresent()).isTrue();
|
||||
assertThat(foundKv.get().getDescription()).isEqualTo(description);
|
||||
assertThat(kvMetadataRepository.findByName(tenantId, anotherNamespace, anotherKey).isPresent()).isFalse();
|
||||
|
||||
KVStore kvStore = new InternalKVStore(tenantId, namespace, storage, kvMetadataRepository);
|
||||
|
||||
@@ -20,9 +20,8 @@ import java.util.Map;
|
||||
@H2RepositoryEnabled
|
||||
public class H2KvMetadataRepository extends AbstractJdbcKvMetadataRepository {
|
||||
@Inject
|
||||
public H2KvMetadataRepository(@Named("kvMetadata") H2Repository<PersistedKvMetadata> repository,
|
||||
ApplicationContext applicationContext) {
|
||||
super(repository, applicationContext);
|
||||
public H2KvMetadataRepository(@Named("kvMetadata") H2Repository<PersistedKvMetadata> repository) {
|
||||
super(repository);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -17,10 +17,9 @@ import java.util.List;
|
||||
public class MysqlKvMetadataRepository extends AbstractJdbcKvMetadataRepository {
|
||||
@Inject
|
||||
public MysqlKvMetadataRepository(
|
||||
@Named("kvMetadata") MysqlRepository<PersistedKvMetadata> repository,
|
||||
ApplicationContext applicationContext
|
||||
@Named("kvMetadata") MysqlRepository<PersistedKvMetadata> repository
|
||||
) {
|
||||
super(repository, applicationContext);
|
||||
super(repository);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -17,10 +17,9 @@ import java.util.List;
|
||||
public class PostgresKvMetadataRepository extends AbstractJdbcKvMetadataRepository {
|
||||
@Inject
|
||||
public PostgresKvMetadataRepository(
|
||||
@Named("kvMetadata") PostgresRepository<PersistedKvMetadata> repository,
|
||||
ApplicationContext applicationContext
|
||||
@Named("kvMetadata") PostgresRepository<PersistedKvMetadata> repository
|
||||
) {
|
||||
super(repository, applicationContext);
|
||||
super(repository);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -1,14 +1,11 @@
|
||||
package io.kestra.jdbc.repository;
|
||||
|
||||
import io.kestra.core.events.CrudEvent;
|
||||
import io.kestra.core.models.FetchVersion;
|
||||
import io.kestra.core.models.QueryFilter;
|
||||
import io.kestra.core.models.TenantAndNamespace;
|
||||
import io.kestra.core.models.kv.PersistedKvMetadata;
|
||||
import io.kestra.core.repositories.ArrayListTotal;
|
||||
import io.kestra.core.repositories.KvMetadataRepositoryInterface;
|
||||
import io.micronaut.context.ApplicationContext;
|
||||
import io.micronaut.context.event.ApplicationEventPublisher;
|
||||
import io.micronaut.data.model.Pageable;
|
||||
import jakarta.annotation.Nullable;
|
||||
import org.jooq.*;
|
||||
@@ -22,15 +19,12 @@ import java.util.stream.Stream;
|
||||
|
||||
public abstract class AbstractJdbcKvMetadataRepository extends AbstractJdbcRepository implements KvMetadataRepositoryInterface {
|
||||
protected final io.kestra.jdbc.AbstractJdbcRepository<PersistedKvMetadata> jdbcRepository;
|
||||
private final ApplicationEventPublisher<CrudEvent<PersistedKvMetadata>> eventPublisher;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public AbstractJdbcKvMetadataRepository(
|
||||
io.kestra.jdbc.AbstractJdbcRepository<PersistedKvMetadata> jdbcRepository,
|
||||
ApplicationContext applicationContext
|
||||
io.kestra.jdbc.AbstractJdbcRepository<PersistedKvMetadata> jdbcRepository
|
||||
) {
|
||||
this.jdbcRepository = jdbcRepository;
|
||||
this.eventPublisher = applicationContext.getBean(ApplicationEventPublisher.class);
|
||||
}
|
||||
|
||||
private static Condition lastCondition(boolean isLast) {
|
||||
@@ -138,13 +132,7 @@ public abstract class AbstractJdbcKvMetadataRepository extends AbstractJdbcRepos
|
||||
))).toList()));
|
||||
}
|
||||
|
||||
int deletedAmount = deleteCondition
|
||||
.execute();
|
||||
|
||||
e.getValue().forEach(kvMetadata -> eventPublisher.publishEvent(CrudEvent.of(
|
||||
kvMetadata,
|
||||
null
|
||||
)));
|
||||
int deletedAmount = deleteCondition.execute();
|
||||
|
||||
return totalForTenantNamespace + deletedAmount;
|
||||
}, Integer::sum);
|
||||
@@ -173,10 +161,6 @@ public abstract class AbstractJdbcKvMetadataRepository extends AbstractJdbcRepos
|
||||
}
|
||||
}
|
||||
|
||||
eventPublisher.publishEvent(CrudEvent.of(
|
||||
maybePrevious.orElse(null),
|
||||
kvMetadataToPersist
|
||||
));
|
||||
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(kvMetadataToPersist);
|
||||
this.jdbcRepository.persist(kvMetadataToPersist, context, fields);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user