800 lines
34 KiB
Plaintext
800 lines
34 KiB
Plaintext
<%@ jet
|
|
imports="
|
|
org.talend.designer.codegen.config.CodeGeneratorArgument
|
|
org.talend.core.model.process.INode
|
|
org.talend.core.model.process.ElementParameterParser
|
|
org.talend.core.model.metadata.IMetadataTable
|
|
org.talend.core.model.metadata.types.JavaTypesManager
|
|
org.talend.core.model.metadata.MappingTypeRetriever
|
|
org.talend.core.model.metadata.MetadataTalendType
|
|
org.talend.core.model.process.IConnection
|
|
org.talend.core.model.utils.NodeUtil
|
|
java.util.List
|
|
java.util.ArrayList
|
|
java.util.Map
|
|
java.util.HashMap
|
|
"
|
|
skeleton="../templates/db_output_bulk.skeleton"
|
|
%>
|
|
<%@ include file="@{org.talend.designer.components.localprovider}/components/templates/Log4j/DBLogUtil.javajet"%>
|
|
<%
|
|
CodeGeneratorArgument codeGenArgument = (CodeGeneratorArgument) argument;
|
|
INode node = (INode)codeGenArgument.getArgument();
|
|
String cid = node.getUniqueName();
|
|
|
|
String storageType = ElementParameterParser.getValue(node, "__AZURE_STORAGE__");
|
|
String accountName = ElementParameterParser.getValue(node, "__ACCOUNT_NAME__");
|
|
|
|
String clientId = ElementParameterParser.getValue(node, "__CLIENT_ID__");
|
|
String oauthEndpoint = ElementParameterParser.getValue(node, "__ENDPOINT__");
|
|
|
|
String container = ElementParameterParser.getValue(node, "__CONTAINER__");
|
|
|
|
String azureFolder = ElementParameterParser.getValue(node, "__AZURE_LOCATION__");
|
|
|
|
boolean isLog4jEnabled = ("true").equals(ElementParameterParser.getValue(node.getProcess(), "__LOG4J_ACTIVATE__"));
|
|
%>
|
|
|
|
//AZURE DW CODE
|
|
|
|
<%
|
|
dbLog = new DBLogUtil(node);
|
|
|
|
String dbname= ElementParameterParser.getValue(node, "__DBNAME__");
|
|
|
|
String dbproperties = ElementParameterParser.getValue(node, "__PROPERTIES__");
|
|
|
|
String dbhost = ElementParameterParser.getValue(node, "__HOST__");
|
|
|
|
String dbport = ElementParameterParser.getValue(node, "__PORT__");
|
|
|
|
String dbschema = ElementParameterParser.getValue(node, "__DB_SCHEMA__");
|
|
|
|
String dbuser= ElementParameterParser.getValue(node, "__USER__");
|
|
|
|
String table = ElementParameterParser.getValue(node,"__TABLE__");
|
|
|
|
String dbmsId = ElementParameterParser.getValue(node,"__MAPPING__");
|
|
|
|
getManager(dbmsId, cid, node);//register the MSSQLManager
|
|
|
|
String schemaList = ElementParameterParser.getValue(node,"__MANUAL_SCHEMA_LIST__");
|
|
|
|
String tableAction = ElementParameterParser.getValue(node,"__TABLE_ACTION__");
|
|
|
|
String fieldSeparator = ElementParameterParser.getValueWithUIFieldKey(node,"__FIELDSEPARATOR__","FIELDSEPARATOR");
|
|
|
|
String loadFileFormat = ElementParameterParser.getValue(node,"__IMPORT_TYPE__");
|
|
|
|
boolean dateFormat = "true".equals(ElementParameterParser.getValue(node,"__DATE_FORMAT__"));
|
|
|
|
String datePattern = ElementParameterParser.getValue(node,"__DATE_PATTERN__");
|
|
|
|
boolean useStringDelimiter = "true".equals(ElementParameterParser.getValue(node,"__USE_STRING_DELIMITER__"));
|
|
|
|
String stringDelimiter = ElementParameterParser.getValueWithUIFieldKey(node,"__STRINGDELIMITER__","STRINGDELIMITER");
|
|
|
|
boolean isTypeDefault = "true".equals(ElementParameterParser.getValue(node,"__USE_TYPE_DEFAULT__"));
|
|
|
|
boolean isCompress = "true".equals(ElementParameterParser.getValue(node, "__COMPRESS__"));
|
|
|
|
String compressionType = null;
|
|
|
|
String serdeMethod = ElementParameterParser.getValue(node,"__SERDE_METHOD__");
|
|
|
|
if (isCompress) {
|
|
if ("DELIMITED_TEXT".equals(loadFileFormat)) {
|
|
compressionType = ElementParameterParser.getValue(node,"__COMPRESSION_TEXT__");
|
|
} else if ("HIVE_RCFILE".equals(loadFileFormat)) {
|
|
compressionType = ElementParameterParser.getValue(node,"__COMPRESSION_RC__");
|
|
} else if ("HIVE_ORC".equals(loadFileFormat)) {
|
|
compressionType = ElementParameterParser.getValue(node,"__COMPRESSION_ORC__");
|
|
} else if ("PARQUET".equals(loadFileFormat)) {
|
|
compressionType = ElementParameterParser.getValue(node,"__COMPRESSION_PARQUET__");
|
|
}
|
|
}
|
|
|
|
String fileFormatType= null;
|
|
if ("DELIMITED_TEXT".equals(loadFileFormat)) {
|
|
fileFormatType = "DelimitedText";
|
|
} else if ("HIVE_RCFILE".equals(loadFileFormat)) {
|
|
fileFormatType = "RCFILE";
|
|
} else if ("HIVE_ORC".equals(loadFileFormat)) {
|
|
fileFormatType = "ORC";
|
|
} else if ("PARQUET".equals(loadFileFormat)) {
|
|
fileFormatType = "PARQUET";
|
|
}
|
|
// External Table Reject Options
|
|
|
|
boolean isEnabledExtTableOptions = "true".equals(ElementParameterParser.getValue(node, "__EXT_TABLE_OPTIONS__"));
|
|
|
|
String rejectType = ElementParameterParser.getValue(node,"__REJECT_TYPE__");
|
|
|
|
String rejectValue = ElementParameterParser.getValue(node,"__REJECT_VALUE__");
|
|
|
|
String rejectSampleValue = ElementParameterParser.getValue(node,"__REJECT_SAMPLE_VALUE__");
|
|
|
|
String DistributionOption = ElementParameterParser.getValue(node,"__DISTRIBUTION__");
|
|
|
|
String DistributionColumnName = ElementParameterParser.getValue(node,"__DISTRIBUTION_COLUMN_NAME__");
|
|
|
|
String TableOption = ElementParameterParser.getValue(node,"__TABLE_OPTION__");
|
|
|
|
String IndexColumns = ElementParameterParser.getValue(node,"__INDEX_COLUMNS__");
|
|
|
|
boolean isEnabled_Partition = "true".equals(ElementParameterParser.getValue(node, "__PARTITION__"));
|
|
|
|
String PartitionColumnName = ElementParameterParser.getValue(node,"__PARTITION_COLUMN_NAME__");
|
|
|
|
String PartitionRange = ElementParameterParser.getValue(node,"__PARTITION_COLUMN_NAME_RANGE__");
|
|
|
|
String PartitionValues = ElementParameterParser.getValue(node,"__PARTITION_VALUES__");
|
|
|
|
// External Table Reject Options END
|
|
|
|
%>
|
|
|
|
final String OUT_DELIM_<%=cid %> = <%=fieldSeparator %>;
|
|
|
|
String randomIdentifier_<%=cid%> = java.util.UUID.randomUUID().toString().replaceAll("-","");
|
|
|
|
String azureCredIdentity_<%=cid%> = "";
|
|
String azureCredName_<%=cid%> = "";
|
|
String azureContainer_<%=cid%> = "";
|
|
|
|
String dbschema_<%=cid%> = null;
|
|
//external tableName
|
|
String ext_tableName_<%=cid%> = null;
|
|
|
|
String tableName_<%=cid%> = null;
|
|
String azureFolder_<%=cid%> = <%=azureFolder%>;
|
|
|
|
String clientId_<%=cid%> = <%=clientId%>;
|
|
String oauthEndpoint_<%=cid%> = <%=oauthEndpoint%>;
|
|
|
|
<%
|
|
String passwordFieldName = "";
|
|
if ("ADLS".equals(storageType)) {
|
|
passwordFieldName = "__PRINCIPAL_KEY__";
|
|
}else{
|
|
passwordFieldName = "__SECRET_KEY__";
|
|
}
|
|
if (ElementParameterParser.canEncrypt(node, passwordFieldName)) {
|
|
%>
|
|
final String azureAccessKey_<%=cid%> = routines.system.PasswordEncryptUtil.decryptPassword(<%=ElementParameterParser.getEncryptedValue(node, passwordFieldName)%>);
|
|
<%
|
|
} else {
|
|
|
|
%>
|
|
final String azureAccessKey_<%=cid%> = <%= ElementParameterParser.getValue(node, passwordFieldName)%>;
|
|
<%
|
|
}
|
|
|
|
if ("ADLS".equals(storageType)) {
|
|
%>
|
|
azureCredIdentity_<%=cid%> = clientId_<%=cid%> + "@" + oauthEndpoint_<%=cid%>;
|
|
azureCredName_<%=cid%> = "talend_ADLCredential_" + randomIdentifier_<%=cid%>;
|
|
<%
|
|
} else {
|
|
%>
|
|
azureCredIdentity_<%=cid%> = "talend";
|
|
azureCredName_<%=cid%> = "talend_AzureStorageCredential_" + randomIdentifier_<%=cid%>;
|
|
azureContainer_<%=cid%> = <%=container%>;
|
|
<%
|
|
}
|
|
%>
|
|
|
|
String azureAccountName_<%=cid%> = <%=accountName%>;
|
|
String azureExtDataSrc_<%=cid%> = "talend_DataSource_" + randomIdentifier_<%=cid%>;
|
|
String azureExtFileFormat_<%=cid%> = "talend_FileFormat_" + randomIdentifier_<%=cid%>;
|
|
|
|
String azureFileLoadType_<%=cid%> = "<%=loadFileFormat%>";
|
|
|
|
String dwDistributionOptions_<%=cid%> = "";
|
|
String dwTableOptions_<%=cid%> = "";
|
|
String dwPartition_<%=cid%> = "";
|
|
|
|
<%
|
|
List<IMetadataColumn> columnList = getColumnList(node);
|
|
List<Column> stmtStructure = null;
|
|
Manager manager = null;
|
|
|
|
boolean isDynamic = false;
|
|
List<IMetadataTable> metadatas = node.getMetadataList();
|
|
if ((metadatas!=null)&&(metadatas.size()>0)) {
|
|
IMetadataTable metadata = metadatas.get(0);
|
|
|
|
}
|
|
|
|
if(columnList != null && columnList.size() > 0) {
|
|
stmtStructure = getManager(dbmsId, cid).createColumnList(columnList, false, null, null);
|
|
}
|
|
|
|
%>
|
|
|
|
//BEGIN CONNECTION Code
|
|
|
|
java.sql.Connection conn_<%=cid%> = null;
|
|
<%
|
|
boolean useExistingConnection = "true".equals(ElementParameterParser.getValue(node,"__USE_EXISTING_CONNECTION__"));
|
|
%>
|
|
String dbUser_<%=cid %> = null;
|
|
<%
|
|
if(useExistingConnection) {
|
|
String connection = ElementParameterParser.getValue(node,"__CONNECTION__");
|
|
String conn = "conn_" + connection;
|
|
String schema = "dbschema_" + connection;
|
|
%>
|
|
dbschema_<%=cid%> = (String)globalMap.get("<%=schema%>");
|
|
conn_<%=cid%> = (java.sql.Connection)globalMap.get("<%=conn%>");
|
|
<%
|
|
} else {
|
|
%>
|
|
dbschema_<%=cid%> = <%=dbschema%>;
|
|
String driverClass_<%=cid%> = "com.microsoft.sqlserver.jdbc.SQLServerDriver";
|
|
<%dbLog.conn().logJDBCDriver(dbLog.var("driverClass"));%>
|
|
java.lang.Class.forName(driverClass_<%=cid%>);
|
|
String port_<%=cid%> = <%=dbport%>;
|
|
String dbname_<%=cid%> = <%=dbname%> ;
|
|
String url_<%=cid %> = "jdbc:sqlserver://" + <%=dbhost%> ;
|
|
if (!"".equals(port_<%=cid%>)) {
|
|
url_<%=cid %> += ":" + <%=dbport%>;
|
|
}
|
|
if (!"".equals(dbname_<%=cid%>)) {
|
|
url_<%=cid%> += ";database=" + <%=dbname%>;
|
|
}
|
|
url_<%=cid%> += ";appName=" + projectName + ";" + <%=dbproperties%>;
|
|
dbUser_<%=cid %> = <%=dbuser%>;
|
|
|
|
<%
|
|
passwordFieldName = "__PASS__";
|
|
%>
|
|
|
|
<%@ include file="@{org.talend.designer.components.localprovider}/components/templates/password.javajet"%>
|
|
|
|
String dbPwd_<%=cid %> = decryptedPassword_<%=cid%>;
|
|
<%dbLog.conn().connTry(dbLog.var("url"), dbLog.var("dbUser"));%>
|
|
conn_<%=cid%> = java.sql.DriverManager.getConnection(url_<%=cid %>,dbUser_<%=cid%>,dbPwd_<%=cid%>);
|
|
<%dbLog.conn().connDone(dbLog.var("url"));%>
|
|
<%
|
|
}
|
|
%>
|
|
|
|
// END CONNECTION Code
|
|
|
|
<%
|
|
if(!useExistingConnection) {
|
|
%>
|
|
resourceMap.put("conn_<%=cid%>", conn_<%=cid%>);
|
|
conn_<%=cid%>.setAutoCommit(true);
|
|
<%
|
|
}
|
|
dbLog.commit().logAutoCommit("conn_"+cid+".getAutoCommit()");
|
|
%>
|
|
if(dbschema_<%=cid%> == null || dbschema_<%=cid%>.trim().length() == 0) {
|
|
ext_tableName_<%=cid%> = <%=table%> + "_" + randomIdentifier_<%=cid%>;
|
|
} else {
|
|
ext_tableName_<%=cid%> = dbschema_<%=cid%> + "].[" + <%=table%> + "_" + randomIdentifier_<%=cid%>;
|
|
}
|
|
|
|
if(dbschema_<%=cid%> == null || dbschema_<%=cid%>.trim().length() == 0) {
|
|
tableName_<%=cid%> = <%=table%>;
|
|
} else {
|
|
tableName_<%=cid%> = dbschema_<%=cid%> + "].[" + <%=table%>;
|
|
}
|
|
<%
|
|
if(isEnabledExtTableOptions){
|
|
%>
|
|
String externalTableRejectOptions_<%=cid%> = "REJECT_TYPE = <%=rejectType%>" + ",REJECT_VALUE = " + <%=rejectValue%>;
|
|
<%
|
|
if("Percentage".equals(rejectType)){
|
|
%>
|
|
externalTableRejectOptions_<%=cid%> += ",REJECT_SAMPLE_VALUE = " + <%=rejectSampleValue%>;
|
|
<%
|
|
}
|
|
}
|
|
|
|
if(("DROP_CREATE").equals(tableAction) || ("CREATE").equals(tableAction) || ("CREATE_IF_NOT_EXISTS").equals(tableAction) || ("DROP_IF_EXISTS_AND_CREATE").equals(tableAction)) {
|
|
|
|
// DW Table Options
|
|
if("CLUSTERED_COLUMNSTORE_INDEX".equals(TableOption)){
|
|
%>
|
|
dwTableOptions_<%=cid%> = "CLUSTERED COLUMNSTORE INDEX, ";
|
|
<%
|
|
}else if("HEAP".equals(TableOption)){
|
|
%>
|
|
dwTableOptions_<%=cid%> = "HEAP, ";
|
|
<%
|
|
}else if("CLUSTERED_INDEX".equals(TableOption)){
|
|
%>
|
|
dwTableOptions_<%=cid%> = "CLUSTERED INDEX (" + <%=IndexColumns%> + "), ";
|
|
<%
|
|
}
|
|
// DW Table Options END
|
|
|
|
// DW Distribution Options
|
|
if("ROUND_ROBIN".equals(DistributionOption)){
|
|
%>
|
|
dwDistributionOptions_<%=cid%> = "DISTRIBUTION = ROUND_ROBIN";
|
|
<%
|
|
}else if("REPLICATE".equals(DistributionOption)){
|
|
%>
|
|
dwDistributionOptions_<%=cid%> = "DISTRIBUTION = REPLICATE";
|
|
<%
|
|
}else if("HASH".equals(DistributionOption)){
|
|
%>
|
|
dwDistributionOptions_<%=cid%> = "DISTRIBUTION = HASH(" + <%=DistributionColumnName%> + ")";
|
|
<%
|
|
}
|
|
// DW Distribution Options END
|
|
|
|
// DW Table Partition
|
|
if(isEnabled_Partition){
|
|
%>
|
|
dwPartition_<%=cid%> = ", PARTITION ( [" + <%=PartitionColumnName%> + "] RANGE <%=PartitionRange%> FOR VALUES (" + <%=PartitionValues%> + ") )";
|
|
<%
|
|
}
|
|
// DW Table Partition END
|
|
|
|
}
|
|
|
|
if(columnList != null && columnList.size()>0) {
|
|
if(!isDynamic) {
|
|
%>
|
|
int count_<%=cid%>=0;
|
|
<%
|
|
manager = getManager(dbmsId, cid);
|
|
String ending="";
|
|
%>
|
|
//Create a database scoped credential using Azure Access Key
|
|
//IDENTITY: Provide any string, it is not used for authentication to Azure storage.
|
|
//SECRET: Provide your Azure storage account key.
|
|
|
|
java.sql.Statement stmtCreateDBCredentials_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
stmtCreateDBCredentials_<%=cid%>.execute("CREATE DATABASE SCOPED CREDENTIAL " + azureCredName_<%=cid%> + " WITH IDENTITY = '" + azureCredIdentity_<%=cid%> + "', SECRET = '" + azureAccessKey_<%=cid%> + "'");
|
|
stmtCreateDBCredentials_<%=cid%>.close();
|
|
|
|
<%
|
|
if(isLog4jEnabled) {
|
|
%>
|
|
log.info("<%=cid%> - Created Database Scoped Credential: " + azureCredName_<%=cid%> + " WITH IDENTITY " + azureCredIdentity_<%=cid%> + " successfully.");
|
|
<%
|
|
}
|
|
%>
|
|
|
|
//Create an external data source
|
|
//TYPE: HADOOP - PolyBase uses Hadoop APIs to access data in Azure blob storage.
|
|
//LOCATION: Provide Azure storage account name and blob container name.
|
|
//CREDENTIAL: Provide the credential created in the previous step.
|
|
|
|
java.sql.Statement stmtCreateExtDataSource_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
|
|
<%
|
|
if ("ADLS".equals(storageType)) {
|
|
%>
|
|
stmtCreateExtDataSource_<%=cid%>.execute("CREATE EXTERNAL DATA SOURCE " + azureExtDataSrc_<%=cid%> + " WITH (TYPE=HADOOP, LOCATION = 'adl://" + azureAccountName_<%=cid%> + ".azuredatalakestore.net', CREDENTIAL = " + azureCredName_<%=cid%> + ")");
|
|
<%
|
|
} else {
|
|
%>
|
|
stmtCreateExtDataSource_<%=cid%>.execute("CREATE EXTERNAL DATA SOURCE " + azureExtDataSrc_<%=cid%> + " WITH (TYPE=HADOOP, LOCATION = 'wasbs://" + azureContainer_<%=cid%> + "@" + azureAccountName_<%=cid%> + ".blob.core.windows.net', CREDENTIAL = " + azureCredName_<%=cid%> + ")");
|
|
<%
|
|
}
|
|
%>
|
|
stmtCreateExtDataSource_<%=cid%>.close();
|
|
|
|
<%
|
|
if(isLog4jEnabled) {
|
|
%>
|
|
log.info("<%=cid%> - Created External Data Source: " + azureExtDataSrc_<%=cid%> + " WITH Credential " + azureCredName_<%=cid%> + " successfully.");
|
|
<%
|
|
}
|
|
%>
|
|
//newCode END
|
|
|
|
StringBuilder cmdExtFileFormat_<%=cid%> = new StringBuilder();
|
|
java.sql.Statement stmtCreateExtFileFormat_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
|
|
cmdExtFileFormat_<%=cid%>.append("CREATE EXTERNAL FILE FORMAT ").append(azureExtFileFormat_<%=cid%>).append(" WITH (FORMAT_TYPE = <%=fileFormatType%>");
|
|
<%
|
|
if("DELIMITED_TEXT".equals(loadFileFormat)){
|
|
%>
|
|
cmdExtFileFormat_<%=cid%>.append(", FORMAT_OPTIONS (FIELD_TERMINATOR = '").append(OUT_DELIM_<%=cid %>).append("'");
|
|
cmdExtFileFormat_<%=cid%>.append(", USE_TYPE_DEFAULT = <%=isTypeDefault%>");
|
|
<%
|
|
if(useStringDelimiter){
|
|
%>
|
|
cmdExtFileFormat_<%=cid%>.append(", STRING_DELIMITER = '").append(<%=stringDelimiter%>).append("'");
|
|
<%
|
|
}
|
|
if(dateFormat){
|
|
%>
|
|
cmdExtFileFormat_<%=cid%>.append(", DATE_FORMAT = '").append(<%=datePattern%>).append("'");
|
|
<%
|
|
}
|
|
%>
|
|
cmdExtFileFormat_<%=cid%>.append(")");
|
|
<%
|
|
} else if("HIVE_RCFILE".equals(loadFileFormat)){
|
|
%>
|
|
cmdExtFileFormat_<%=cid%>.append(", SERDE_METHOD = '<%=serdeMethod%>'");
|
|
<%
|
|
}
|
|
|
|
if (compressionType != null){
|
|
%>
|
|
cmdExtFileFormat_<%=cid%>.append(", DATA_COMPRESSION = '<%=compressionType%>'");
|
|
<%
|
|
}
|
|
%>
|
|
cmdExtFileFormat_<%=cid%>.append(")");
|
|
stmtCreateExtFileFormat_<%=cid%>.execute(cmdExtFileFormat_<%=cid%>.toString());
|
|
stmtCreateExtFileFormat_<%=cid%>.close();
|
|
<%
|
|
if(isLog4jEnabled) {
|
|
%>
|
|
log.info("<%=cid%> - Created External File Format: " + azureExtFileFormat_<%=cid%> + " successfully.");
|
|
<%
|
|
}
|
|
%>
|
|
//CREATE EXTERNAL TABLE
|
|
java.sql.Statement stmtCreateExt_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
<%dbLog.table().createTry(dbLog.var("ext_tableName"));%>
|
|
stmtCreateExt_<%=cid%>.execute("<%=manager.getCreateTableSQL(stmtStructure)%>) WITH (LOCATION='/ " + azureFolder_<%=cid%> + "', DATA_SOURCE=" + azureExtDataSrc_<%=cid%> + ", FILE_FORMAT= " + azureExtFileFormat_<%=cid%>
|
|
<%
|
|
if(isEnabledExtTableOptions){
|
|
%>
|
|
+ "," + externalTableRejectOptions_<%=cid%>
|
|
<%
|
|
}
|
|
%>
|
|
+ ")");
|
|
<%dbLog.table().createDone(dbLog.var("ext_tableName"));%>
|
|
stmtCreateExt_<%=cid%>.close();
|
|
<%
|
|
if(isLog4jEnabled) {
|
|
%>
|
|
log.info("<%=cid%> - Created External Table : [" + ext_tableName_<%=cid%> + "] successfully.");
|
|
<%
|
|
}
|
|
%>
|
|
// CREATE DW TABLE AND / OR INSERT INTO DW TABLE
|
|
try{
|
|
<%
|
|
if(!("NONE").equals(tableAction)) {
|
|
if(("DROP_CREATE").equals(tableAction)) {
|
|
%>
|
|
java.sql.Statement stmtDrop_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
<%dbLog.table().dropTry(dbLog.var("tableName"));%>
|
|
stmtDrop_<%=cid%>.execute("DROP TABLE [" + tableName_<%=cid%> + "]");
|
|
<%dbLog.table().dropDone(dbLog.var("tableName"));%>
|
|
stmtDrop_<%=cid%>.close();
|
|
|
|
java.sql.Statement stmtCreate_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
<%dbLog.table().createTry(dbLog.var("tableName"));%>
|
|
stmtCreate_<%=cid%>.execute("CREATE TABLE [" + tableName_<%=cid%> + "] WITH (" + dwTableOptions_<%=cid%> + dwDistributionOptions_<%=cid%> + dwPartition_<%=cid%> + ") AS SELECT * FROM [" + ext_tableName_<%=cid%> + "] WHERE 1=2");
|
|
<%dbLog.table().createDone(dbLog.var("tableName"));%>
|
|
stmtCreate_<%=cid%>.close();
|
|
|
|
java.sql.Statement stmtInsert_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
<%
|
|
if(isLog4jEnabled) {
|
|
%>
|
|
log.info("<%=cid%> - Starting INSERT into table : [" + tableName_<%=cid%> + "].");
|
|
<%
|
|
}
|
|
%>
|
|
int insertedCount_<%=cid%> = stmtInsert_<%=cid%>.executeUpdate("INSERT INTO [" + tableName_<%=cid%> + "] SELECT * FROM [" + ext_tableName_<%=cid%> + "]");
|
|
globalMap.put("<%=cid%>_NB_LINE_INSERTED",insertedCount_<%=cid%>);
|
|
stmtInsert_<%=cid%>.close();
|
|
<%
|
|
if(isLog4jEnabled) {
|
|
%>
|
|
log.info("<%=cid%> - INSERT into table : [" + tableName_<%=cid%> + "] completed successfully.");
|
|
log.info("<%=cid%> - Number of Rows Inserted : " + insertedCount_<%=cid%> + ".");
|
|
<%
|
|
}
|
|
} else if(("CREATE").equals(tableAction)) {
|
|
%>
|
|
java.sql.Statement stmtCreate_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
<%dbLog.table().createTry(dbLog.var("tableName"));%>
|
|
stmtCreate_<%=cid%>.execute("CREATE TABLE [" + tableName_<%=cid%> + "] WITH (" + dwTableOptions_<%=cid%> + dwDistributionOptions_<%=cid%> + dwPartition_<%=cid%> + ") AS SELECT * FROM [" + ext_tableName_<%=cid%> + "] WHERE 1=2");
|
|
<%dbLog.table().createDone(dbLog.var("tableName"));%>
|
|
stmtCreate_<%=cid%>.close();
|
|
|
|
java.sql.Statement stmtInsert_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
<%
|
|
if(isLog4jEnabled) {
|
|
%>
|
|
log.info("<%=cid%> - Starting INSERT into table : [" + tableName_<%=cid%> + "].");
|
|
<%
|
|
}
|
|
%>
|
|
int insertedCount_<%=cid%> = stmtInsert_<%=cid%>.executeUpdate("INSERT INTO [" + tableName_<%=cid%> + "] SELECT * FROM [" + ext_tableName_<%=cid%> + "]");
|
|
globalMap.put("<%=cid%>_NB_LINE_INSERTED",insertedCount_<%=cid%>);
|
|
stmtInsert_<%=cid%>.close();
|
|
<%
|
|
if(isLog4jEnabled) {
|
|
%>
|
|
log.info("<%=cid%> - INSERT into table : [" + tableName_<%=cid%> + "] completed successfully.");
|
|
log.info("<%=cid%> - Number of Rows Inserted : " + insertedCount_<%=cid%> + ".");
|
|
<%
|
|
}
|
|
} else if(("CREATE_IF_NOT_EXISTS").equals(tableAction) || ("DROP_IF_EXISTS_AND_CREATE").equals(tableAction)) {
|
|
boolean tableNameCaseSensitive=false;
|
|
%>
|
|
//check if table exists
|
|
java.sql.Statement isExistStmt_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
boolean whetherExist_<%=cid%> = false;
|
|
|
|
try {
|
|
isExistStmt_<%=cid%>.execute("SELECT TOP 1 1 FROM [" + tableName_<%=cid%> + "]" );
|
|
whetherExist_<%=cid%> = true;
|
|
} catch (java.lang.Exception e){
|
|
whetherExist_<%=cid%> = false;
|
|
}
|
|
isExistStmt_<%=cid%>.close();
|
|
<%
|
|
if(("CREATE_IF_NOT_EXISTS").equals(tableAction)) {
|
|
%>
|
|
if(!whetherExist_<%=cid%>) {
|
|
java.sql.Statement stmtCreate_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
<%dbLog.table().createTry(dbLog.var("tableName"));%>
|
|
stmtCreate_<%=cid%>.execute("CREATE TABLE [" + tableName_<%=cid%> + "] WITH (" + dwTableOptions_<%=cid%> + dwDistributionOptions_<%=cid%> + dwPartition_<%=cid%> + ") AS SELECT * FROM [" + ext_tableName_<%=cid%> + "] WHERE 1=2");
|
|
<%dbLog.table().createDone(dbLog.var("tableName"));%>
|
|
stmtCreate_<%=cid%>.close();
|
|
}
|
|
|
|
java.sql.Statement stmtInsert_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
<%
|
|
if(isLog4jEnabled) {
|
|
%>
|
|
log.info("<%=cid%> - Starting INSERT into table : [" + tableName_<%=cid%> + "].");
|
|
<%
|
|
}
|
|
%>
|
|
int insertedCount_<%=cid%> = stmtInsert_<%=cid%>.executeUpdate("INSERT INTO [" + tableName_<%=cid%> + "] SELECT * FROM [" + ext_tableName_<%=cid%> + "]");
|
|
globalMap.put("<%=cid%>_NB_LINE_INSERTED",insertedCount_<%=cid%>);
|
|
stmtInsert_<%=cid%>.close();
|
|
<%
|
|
if(isLog4jEnabled) {
|
|
%>
|
|
log.info("<%=cid%> - INSERT into table : [" + tableName_<%=cid%> + "] completed successfully.");
|
|
log.info("<%=cid%> - Number of Rows Inserted : " + insertedCount_<%=cid%> + ".");
|
|
<%
|
|
}
|
|
} else {
|
|
%>
|
|
if(whetherExist_<%=cid%>) {
|
|
java.sql.Statement stmtDrop_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
<%dbLog.table().dropTry(dbLog.var("tableName"));%>
|
|
stmtDrop_<%=cid%>.execute("DROP TABLE [" + tableName_<%=cid%> + "]");
|
|
<%dbLog.table().dropDone(dbLog.var("tableName"));%>
|
|
stmtDrop_<%=cid%>.close();
|
|
}
|
|
java.sql.Statement stmtCreate_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
<%dbLog.table().createTry(dbLog.var("tableName"));%>
|
|
stmtCreate_<%=cid%>.execute("CREATE TABLE [" + tableName_<%=cid%> + "] WITH (" + dwTableOptions_<%=cid%> + dwDistributionOptions_<%=cid%> + dwPartition_<%=cid%> + ") AS SELECT * FROM [" + ext_tableName_<%=cid%> + "] WHERE 1=2");
|
|
<%dbLog.table().createDone(dbLog.var("tableName"));%>
|
|
stmtCreate_<%=cid%>.close();
|
|
java.sql.Statement stmtInsert_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
<%
|
|
if(isLog4jEnabled) {
|
|
%>
|
|
log.info("<%=cid%> - Starting INSERT into table : [" + tableName_<%=cid%> + "].");
|
|
<%
|
|
}
|
|
%>
|
|
int insertedCount_<%=cid%> = stmtInsert_<%=cid%>.executeUpdate("INSERT INTO [" + tableName_<%=cid%> + "] SELECT * FROM [" + ext_tableName_<%=cid%> + "]");
|
|
globalMap.put("<%=cid%>_NB_LINE_INSERTED",insertedCount_<%=cid%>);
|
|
stmtInsert_<%=cid%>.close();
|
|
<%
|
|
if(isLog4jEnabled) {
|
|
%>
|
|
log.info("<%=cid%> - INSERT into table : [" + tableName_<%=cid%> + "] completed successfully.");
|
|
log.info("<%=cid%> - Number of Rows Inserted : " + insertedCount_<%=cid%> + ".");
|
|
<%
|
|
}
|
|
|
|
}
|
|
} else if(("CLEAR").equals(tableAction)) {
|
|
%>
|
|
java.sql.Statement stmtClear_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
<%dbLog.table().clearTry(dbLog.var("tableName"));%>
|
|
stmtClear_<%=cid%>.executeUpdate("<%=manager.getDeleteTableSQL()%>");
|
|
<%dbLog.table().clearDone(dbLog.var("tableName"));%>
|
|
stmtClear_<%=cid%>.close();
|
|
java.sql.Statement stmtInsert_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
<%
|
|
if(isLog4jEnabled) {
|
|
%>
|
|
log.info("<%=cid%> - Starting INSERT into table : [" + tableName_<%=cid%> + "].");
|
|
<%
|
|
}
|
|
%>
|
|
int insertedCount_<%=cid%> = stmtInsert_<%=cid%>.executeUpdate("INSERT INTO [" + tableName_<%=cid%> + "] SELECT * FROM [" + ext_tableName_<%=cid%> + "]");
|
|
globalMap.put("<%=cid%>_NB_LINE_INSERTED",insertedCount_<%=cid%>);
|
|
stmtInsert_<%=cid%>.close();
|
|
<%
|
|
if(isLog4jEnabled) {
|
|
%>
|
|
log.info("<%=cid%> - INSERT into table : [" + tableName_<%=cid%> + "] completed successfully.");
|
|
log.info("<%=cid%> - Number of Rows Inserted : " + insertedCount_<%=cid%> + ".");
|
|
<%
|
|
}
|
|
} else if(("TRUNCATE").equals(tableAction)) {
|
|
%>
|
|
java.sql.Statement stmtTruncCount_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
java.sql.ResultSet rsTruncCount_<%=cid%> = stmtTruncCount_<%=cid%>.executeQuery("<%=manager.getSelectionSQL()%>");
|
|
int rsTruncCountNumber_<%=cid%> = 0;
|
|
if(rsTruncCount_<%=cid%>.next()) {
|
|
rsTruncCountNumber_<%=cid%> = rsTruncCount_<%=cid%>.getInt(1);
|
|
}
|
|
rsTruncCount_<%=cid%>.close();
|
|
stmtTruncCount_<%=cid%>.close();
|
|
java.sql.Statement stmtTrunc_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
<%dbLog.table().truncateTry(dbLog.var("tableName"));%>
|
|
stmtTrunc_<%=cid%>.executeUpdate("<%=manager.getTruncateTableSQL()%>");
|
|
<%dbLog.table().truncateDone(dbLog.var("tableName"));%>
|
|
<%
|
|
if(isLog4jEnabled) {
|
|
%>
|
|
log.info("<%=cid%> - Number of Rows Truncated : " + rsTruncCountNumber_<%=cid%> + ".");
|
|
<%
|
|
}
|
|
%>
|
|
stmtTrunc_<%=cid%>.close();
|
|
|
|
java.sql.Statement stmtInsert_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
<%
|
|
if(isLog4jEnabled) {
|
|
%>
|
|
log.info("<%=cid%> - Starting INSERT into table : [" + tableName_<%=cid%> + "].");
|
|
<%
|
|
}
|
|
%>
|
|
int insertedCount_<%=cid%> = stmtInsert_<%=cid%>.executeUpdate("INSERT INTO [" + tableName_<%=cid%> + "] SELECT * FROM [" + ext_tableName_<%=cid%> + "]");
|
|
globalMap.put("<%=cid%>_NB_LINE_INSERTED",insertedCount_<%=cid%>);
|
|
stmtInsert_<%=cid%>.close();
|
|
<%
|
|
if(isLog4jEnabled) {
|
|
%>
|
|
log.info("<%=cid%> - INSERT into table : [" + tableName_<%=cid%> + "] completed successfully.");
|
|
log.info("<%=cid%> - Number of Rows Inserted : " + insertedCount_<%=cid%> + ".");
|
|
<%
|
|
}
|
|
}
|
|
}else {
|
|
%>
|
|
java.sql.Statement stmtInsert_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
<%
|
|
if(isLog4jEnabled) {
|
|
%>
|
|
log.info("<%=cid%> - Starting INSERT into table : [" + tableName_<%=cid%> + "].");
|
|
<%
|
|
}
|
|
%>
|
|
int insertedCount_<%=cid%> = stmtInsert_<%=cid%>.executeUpdate("INSERT INTO [" + tableName_<%=cid%> + "] SELECT * FROM [" + ext_tableName_<%=cid%> + "]");
|
|
globalMap.put("<%=cid%>_NB_LINE_INSERTED",insertedCount_<%=cid%>);
|
|
stmtInsert_<%=cid%>.close();
|
|
<%
|
|
if(isLog4jEnabled) {
|
|
%>
|
|
log.info("<%=cid%> - INSERT into table : [" + tableName_<%=cid%> + "] completed successfully.");
|
|
log.info("<%=cid%> - Number of Rows Inserted : " + insertedCount_<%=cid%> + ".");
|
|
<%
|
|
}
|
|
}
|
|
%>
|
|
} catch (java.sql.SQLException e_<%=cid%>) {
|
|
<%
|
|
if(isLog4jEnabled) {
|
|
%>
|
|
log.error("<%=cid%> - " + e_<%=cid%>.getMessage());
|
|
<%
|
|
}
|
|
%>
|
|
|
|
java.sql.Statement stmtDropExtTable_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
stmtDropExtTable_<%=cid%>.execute("DROP EXTERNAL TABLE [" + ext_tableName_<%=cid%> + "]");
|
|
stmtDropExtTable_<%=cid%>.close();
|
|
<%
|
|
if(isLog4jEnabled) {
|
|
%>
|
|
log.info("<%=cid%> - Dropped External Table : [" + ext_tableName_<%=cid%> + "] successfully.");
|
|
<%
|
|
}
|
|
%>
|
|
java.sql.Statement stmtDropExtFileFormat_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
stmtDropExtFileFormat_<%=cid%>.execute("DROP EXTERNAL FILE FORMAT [" + azureExtFileFormat_<%=cid%> + "]");
|
|
stmtDropExtFileFormat_<%=cid%>.close();
|
|
<%
|
|
if(isLog4jEnabled) {
|
|
%>
|
|
log.info("<%=cid%> - Dropped External File Format : [" + azureExtFileFormat_<%=cid%> + "] successfully.");
|
|
<%
|
|
}
|
|
%>
|
|
java.sql.Statement stmtDropExtDataSrc_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
stmtDropExtDataSrc_<%=cid%>.execute("DROP EXTERNAL DATA SOURCE [" + azureExtDataSrc_<%=cid%> + "]");
|
|
stmtDropExtDataSrc_<%=cid%>.close();
|
|
<%
|
|
if(isLog4jEnabled) {
|
|
%>
|
|
log.info("<%=cid%> - Dropped External Data Source : [" + azureExtDataSrc_<%=cid%> + "] successfully.");
|
|
<%
|
|
}
|
|
%>
|
|
java.sql.Statement stmtDropDatabaseCred_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
stmtDropDatabaseCred_<%=cid%>.execute("DROP DATABASE SCOPED CREDENTIAL [" + azureCredName_<%=cid%> + "]");
|
|
stmtDropDatabaseCred_<%=cid%>.close();
|
|
<%
|
|
if(isLog4jEnabled) {
|
|
%>
|
|
log.info("<%=cid%> - Dropped Database Scoped Credential : [" + azureCredName_<%=cid%> + "] successfully.");
|
|
<%
|
|
}
|
|
%>
|
|
throw e_<%=cid%>;
|
|
}
|
|
|
|
java.sql.Statement stmtDropExtTable_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
stmtDropExtTable_<%=cid%>.execute("DROP EXTERNAL TABLE [" + ext_tableName_<%=cid%> + "]");
|
|
stmtDropExtTable_<%=cid%>.close();
|
|
<%
|
|
if(isLog4jEnabled) {
|
|
%>
|
|
log.info("<%=cid%> - Dropped External Table : [" + ext_tableName_<%=cid%> + "] successfully.");
|
|
<%
|
|
}
|
|
%>
|
|
java.sql.Statement stmtDropExtFileFormat_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
stmtDropExtFileFormat_<%=cid%>.execute("DROP EXTERNAL FILE FORMAT [" + azureExtFileFormat_<%=cid%> + "]");
|
|
stmtDropExtFileFormat_<%=cid%>.close();
|
|
<%
|
|
if(isLog4jEnabled) {
|
|
%>
|
|
log.info("<%=cid%> - Dropped External File Format : [" + azureExtFileFormat_<%=cid%> + "] successfully.");
|
|
<%
|
|
}
|
|
%>
|
|
java.sql.Statement stmtDropExtDataSrc_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
stmtDropExtDataSrc_<%=cid%>.execute("DROP EXTERNAL DATA SOURCE [" + azureExtDataSrc_<%=cid%> + "]");
|
|
stmtDropExtDataSrc_<%=cid%>.close();
|
|
<%
|
|
if(isLog4jEnabled) {
|
|
%>
|
|
log.info("<%=cid%> - Dropped External Data Source : [" + azureExtDataSrc_<%=cid%> + "] successfully.");
|
|
<%
|
|
}
|
|
%>
|
|
|
|
java.sql.Statement stmtDropDatabaseCred_<%=cid%> = conn_<%=cid%>.createStatement();
|
|
stmtDropDatabaseCred_<%=cid%>.execute("DROP DATABASE SCOPED CREDENTIAL [" + azureCredName_<%=cid%> + "]");
|
|
stmtDropDatabaseCred_<%=cid%>.close();
|
|
<%
|
|
if(isLog4jEnabled) {
|
|
%>
|
|
log.info("<%=cid%> - Dropped Database Scoped Credential : [" + azureCredName_<%=cid%> + "] successfully.");
|
|
<%
|
|
}
|
|
} //end isDynamic
|
|
|
|
if (useExistingConnection) {
|
|
String connection = ElementParameterParser.getValue(node,"__CONNECTION__");
|
|
%>
|
|
boolean isShareIdentity_<%=cid%> = globalMap.get("shareIdentitySetting_<%=connection%>") != null && (Boolean)globalMap.get("shareIdentitySetting_<%=connection%>") == true;
|
|
<%
|
|
}
|
|
}
|
|
|
|
if(!useExistingConnection) {
|
|
%>
|
|
conn_<%=cid%>.close();
|
|
<%
|
|
|
|
}
|
|
|
|
%>
|
|
|
|
|