feat(core): #3427 add OSS tenant migration scripts (#8798)

* feat(core): #3427 add OSS tenant migration scripts

* clean(core): fixes after review

* clean(core): make only one command for oss and EE migration

* fix(core): user synchronisation command and clean PR

---------

Co-authored-by: nKwiatkowski <nkwiatkowski@kestra.io>
This commit is contained in:
Nicolas K.
2025-05-19 16:42:59 +02:00
committed by GitHub
parent 9cdc3c54a3
commit 734fcbc45b
9 changed files with 271 additions and 0 deletions

View File

@@ -2,6 +2,7 @@ package io.kestra.cli;
import io.kestra.cli.commands.configs.sys.ConfigCommand;
import io.kestra.cli.commands.flows.FlowCommand;
import io.kestra.cli.commands.migrations.MigrationCommand;
import io.kestra.cli.commands.namespaces.NamespaceCommand;
import io.kestra.cli.commands.plugins.PluginCommand;
import io.kestra.cli.commands.servers.ServerCommand;
@@ -42,6 +43,7 @@ import java.util.concurrent.Callable;
SysCommand.class,
ConfigCommand.class,
NamespaceCommand.class,
MigrationCommand.class,
}
)
@Introspected

View File

@@ -0,0 +1,29 @@
package io.kestra.cli.commands.migrations;
import io.kestra.cli.AbstractCommand;
import io.kestra.cli.App;
import io.micronaut.configuration.picocli.PicocliRunner;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
@CommandLine.Command(
name = "migrate",
description = "handle migrations",
mixinStandardHelpOptions = true,
subcommands = {
TenantMigrationCommand.class,
}
)
@Slf4j
public class MigrationCommand extends AbstractCommand {
@SneakyThrows
@Override
public Integer call() throws Exception {
super.call();
PicocliRunner.call(App.class, "migrate", "--help");
return 0;
}
}

View File

@@ -0,0 +1,49 @@
package io.kestra.cli.commands.migrations;
import io.kestra.cli.AbstractCommand;
import io.kestra.core.repositories.TenantMigrationInterface;
import io.micronaut.context.ApplicationContext;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import picocli.CommandLine;
import picocli.CommandLine.Option;
@CommandLine.Command(
name = "tenant",
description = "migrate every elements from no tenant to the main tenant"
)
@Slf4j
public class TenantMigrationCommand extends AbstractCommand {
@Inject
private ApplicationContext applicationContext;
@Option(names = "--tenant-id", description = "tenant identifier")
String tenantId;
@Option(names = "--tenant-name", description = "tenant name")
String tenantName;
@Option(names = "--dry-run", description = "Preview only, do not update")
boolean dryRun;
@Override
public Integer call() throws Exception {
super.call();
if (dryRun) {
System.out.println("🧪 Dry-run mode enabled. No changes will be applied.");
}
TenantMigrationService migrationService = this.applicationContext.getBean(TenantMigrationService.class);
try {
migrationService.migrateTenant(tenantId, tenantName, dryRun);
System.out.println("✅ Tenant migration complete.");
} catch (Exception e) {
System.err.println("❌ Tenant migration failed: " + e.getMessage());
e.printStackTrace();
return 1;
}
return 0;
}
}

View File

@@ -0,0 +1,29 @@
package io.kestra.cli.commands.migrations;
import static io.kestra.core.tenant.TenantService.MAIN_TENANT;
import com.github.javaparser.utils.Log;
import io.kestra.core.exceptions.KestraRuntimeException;
import io.kestra.core.repositories.TenantMigrationInterface;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@Singleton
@Slf4j
public class TenantMigrationService {
@Inject
private TenantMigrationInterface tenantMigrationInterface;
public void migrateTenant(String tenantId, String tenantName, boolean dryRun) {
if (StringUtils.isNotBlank(tenantId) && !MAIN_TENANT.equals(tenantId)){
throw new KestraRuntimeException("Tenant configuration is an enterprise feature. It can only be main in OSS");
}
Log.info("🔁 Starting tenant migration...");
tenantMigrationInterface.migrateTenant(MAIN_TENANT, dryRun);
}
}

View File

@@ -0,0 +1,7 @@
package io.kestra.core.repositories;
public interface TenantMigrationInterface {
void migrateTenant(String tenantId, boolean dryRun);
}

View File

@@ -0,0 +1,27 @@
package io.kestra.repository.h2;
import io.kestra.jdbc.JooqDSLContextWrapper;
import io.kestra.jdbc.repository.AbstractJdbcTenantMigration;
import jakarta.inject.Singleton;
import org.jooq.DSLContext;
import org.jooq.Table;
@Singleton
@H2RepositoryEnabled
public class H2TenantMigration extends AbstractJdbcTenantMigration {
protected H2TenantMigration(JooqDSLContextWrapper dslContextWrapper) {
super(dslContextWrapper);
}
@Override
protected int updateTenantId(Table<?> table, DSLContext context) {
String query = """
UPDATE "%s"
SET "value" = '{"tenantId":"%s",' || SUBSTRING("value", 2)
WHERE JQ_STRING("value", '.tenantId') IS NULL
""".formatted(table.getName(), "main");
return context.execute(query, "main");
}
}

View File

@@ -0,0 +1,25 @@
package io.kestra.repository.mysql;
import io.kestra.jdbc.JooqDSLContextWrapper;
import io.kestra.jdbc.repository.AbstractJdbcTenantMigration;
import jakarta.inject.Singleton;
import org.jooq.DSLContext;
import org.jooq.Table;
@Singleton
@MysqlRepositoryEnabled
public class MysqlTenantMigration extends AbstractJdbcTenantMigration {
protected MysqlTenantMigration(JooqDSLContextWrapper dslContextWrapper) {
super(dslContextWrapper);
}
@Override
protected int updateTenantId(Table<?> table, DSLContext context) {
String query = "UPDATE `" + table.getName() + "` " +
"SET `value` = JSON_SET(`value`, '$.tenantId', ?) " +
"WHERE JSON_UNQUOTE(JSON_EXTRACT(`value`, '$.tenantId')) IS NULL";
return context.execute(query, "main");
}
}

View File

@@ -0,0 +1,26 @@
package io.kestra.repository.postgres;
import io.kestra.jdbc.JooqDSLContextWrapper;
import io.kestra.jdbc.repository.AbstractJdbcTenantMigration;
import jakarta.inject.Singleton;
import org.jooq.DSLContext;
import org.jooq.Table;
@Singleton
@PostgresRepositoryEnabled
public class PostgresTenantMigration extends AbstractJdbcTenantMigration {
protected PostgresTenantMigration(
JooqDSLContextWrapper dslContextWrapper) {
super(dslContextWrapper);
}
@Override
protected int updateTenantId(Table<?> table, DSLContext context) {
String query = "UPDATE " + table.getQualifiedName() + " " +
"SET value = jsonb_set(value, '{tenantId}', ?::jsonb) " +
"WHERE (value->>'tenantId') IS NULL";
return context.execute(query, "\"main\"");
}
}

View File

@@ -0,0 +1,77 @@
package io.kestra.jdbc.repository;
import io.kestra.core.repositories.TenantMigrationInterface;
import io.kestra.jdbc.JooqDSLContextWrapper;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.jooq.Condition;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.Table;
import org.jooq.impl.DSL;
@Slf4j
public abstract class AbstractJdbcTenantMigration implements TenantMigrationInterface {
protected final JooqDSLContextWrapper dslContextWrapper;
protected AbstractJdbcTenantMigration(JooqDSLContextWrapper dslContextWrapper) {
this.dslContextWrapper = dslContextWrapper;
}
public void migrateTenant(String tenantId, boolean dryRun) {
migrate(dryRun);
}
public void migrate(boolean dryRun) {
List<Table<?>> tables = dslContextWrapper.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
return context.meta().getTables();
});
log.info("📦 Found {} tables.\n", tables.size());
int totalAffected = 0;
for (Table<?> table : tables) {
Field<String> tenantField = table.field("tenant_id", String.class);
if (tenantField == null) {
continue;
}
if (!dryRun) {
int updated = dslContextWrapper.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
return updateTenantId(table, context);
});
totalAffected += updated;
log.info("✅ Updated {} row(s) in {}", updated, table.getName());
} else {
Condition condition = tenantField.isNull();
int count = dslContextWrapper.transactionResult(configuration -> {
DSLContext context = DSL.using(configuration);
return context.selectCount()
.from(table)
.where(condition)
.fetchOne(0, int.class);
});
if (count > 0) {
log.info("🔸 {}: {} row(s) to update.", table.getName(), count);
totalAffected += count;
} else {
log.info("✅ {}: No updates needed.", table.getName());
}
}
}
if (dryRun) {
log.info("🧪 Dry-run complete. {} row(s) would be updated.", totalAffected);
} else {
log.info("✅ Update complete. {} row(s) updated.", totalAffected);
}
}
protected abstract int updateTenantId(Table<?> table, DSLContext context);
}