package com.megatim.fdxconsultation.core.impl.camel.helper;
|
|
import com.megatim.fdxcommons.model.dataproduction.CommonDataProduction;
|
import com.megatim.fdxcommons.model.dataproduction.DataProductionType;
|
import com.megatim.fdxcommons.model.dataproduction.metadata.ProductionMetaData;
|
import com.megatim.fdxcommons.model.enumeration.StatutDataProduction;
|
import com.megatim.fdxcommons.model.enumeration.TypeDonnee;
|
import com.megatim.fdxcommons.model.integration.ColumnDefinition;
|
import com.megatim.fdxcommons.model.integration.TableDefinition;
|
import com.megatim.fdxcommons.tools.database.exceptions.BadDataValueException;
|
import com.megatim.fdxcommons.tools.database.exceptions.LocalDateTimeValueParseError;
|
import com.megatim.fdxcommons.tools.database.queries.CustomQueries;
|
import com.megatim.fdxcommons.tools.database.queries.InsertDataProductionInDataProductionToDeleteQuery;
|
import com.megatim.fdxcommons.tools.database.queries.UpdateDataProductionQuery;
|
import com.megatim.fdxcommons.tools.database.queries.metadata.InsertionLocalDateTimeValue;
|
import com.megatim.fdxcommons.tools.database.tables.FdxConsultationTable;
|
import com.megatim.fdxcommons.tools.database.tables.FdxTableColumnData;
|
import com.megatim.fdxcommons.tools.database.tables.FdxTableRow;
|
import com.megatim.fdxcommons.tools.database.tables.appcolumns.DataProductionIdColumnDefinition;
|
import com.megatim.fdxcommons.tools.database.tables.appcolumns.DataProductionUpdateIdColumnDefinition;
|
import com.megatim.fdxcommons.tools.database.tables.appcolumns.DateCreationColumnDefinition;
|
import com.megatim.fdxcommons.tools.database.tables.appcolumns.DateMiseAJourColumnDefinition;
|
import com.megatim.fdxcommons.tools.database.tables.appcolumns.FdxConsultationIndexColumnDefinition;
|
import com.megatim.fdxconsultation.core.ifaces.dataproduction.metadata.ProductionMetaDataManager;
|
import com.megatim.fdxconsultation.core.ifaces.helper.TableInsert;
|
import com.megatim.fdxconsultation.core.ifaces.integration.ColumnDefinitionManager;
|
import com.megatim.fdxconsultation.core.impl.factory.FdxConsultationTableFactory;
|
import java.io.BufferedReader;
|
import java.io.File;
|
import java.io.FileInputStream;
|
import java.io.IOException;
|
import java.io.InputStreamReader;
|
import java.math.BigDecimal;
|
import java.nio.charset.Charset;
|
import java.sql.Connection;
|
import java.sql.SQLException;
|
import java.util.ArrayList;
|
import java.util.List;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicInteger;
|
import javax.naming.NamingException;
|
import org.apache.commons.csv.CSVFormat;
|
import org.apache.commons.csv.CSVParser;
|
import org.apache.commons.csv.CSVRecord;
|
import org.apache.commons.text.StringEscapeUtils;
|
import org.apache.commons.validator.GenericValidator;
|
|
/**
|
*
|
* @author Gabuntu
|
*/
|
public class FileInsertionProcess {
|
|
private final ColumnDefinitionManager columnDefinitionManager;
|
private final CommonDataProduction dataProduction;
|
private final Connection connection;
|
private final File dataFile;
|
private final int BATCH_SIZE = 100_000;
|
private final Charset charset;
|
private final TableInsert tableInsert;
|
private final ProductionMetaDataManager metaDataManager;
|
|
public FileInsertionProcess(ColumnDefinitionManager columnDefinitionManager, CommonDataProduction dataProduction, Connection connection, File dataFile, Charset charset, TableInsert tableInsert, ProductionMetaDataManager metaDataManager) {
|
this.columnDefinitionManager = columnDefinitionManager;
|
this.dataProduction = dataProduction;
|
this.connection = connection;
|
this.dataFile = dataFile;
|
this.charset = charset;
|
this.tableInsert = tableInsert;
|
this.metaDataManager = metaDataManager;
|
}
|
|
public void process() throws IOException, SQLException, NamingException, Exception {
|
System.out.println("--------------------In FileInsertionProcess-----------------");
|
ProductionMetaData metaData = metaDataManager.getById(dataProduction.getId());
|
|
if (metaData != null) {
|
TableDefinition tableDefinition = tableDefinition(dataProduction.getCodeTypeFichier(), dataProduction.getReferentielVersion());
|
|
if (metaData.getDataProductionType().equals(DataProductionType.UPDATE.toString())) {
|
List<Long> indexes = new CustomQueries().indexes(metaData.getProductionId(), connection);
|
|
if (!indexes.isEmpty()) {
|
if (tableDefinition.getHeaderPresent() != null && tableDefinition.getColumnDelimiter() != null && tableDefinition.getLineDelimiter() != null) {
|
insertDataFromCsvFileToDatabase(metaData, indexes, tableDefinition);
|
} else {
|
insertDataFromFileToDatabase(metaData, indexes, tableDefinition.getColumnDefinitions());
|
}
|
dataProduction.setStatutDataProduction(StatutDataProduction.A_CONSOMMER);
|
new UpdateDataProductionQuery(dataProduction, connection).execute();
|
} else {
|
dataProduction.setStatutDataProduction(StatutDataProduction.CONSOMME);
|
new UpdateDataProductionQuery(dataProduction, connection).execute();
|
}
|
} else if (metaData.getDataProductionType().equals(DataProductionType.ADD.toString())) {
|
System.out.println("-----------------In DataProductionType.ADD--------------------");
|
if (tableDefinition.getHeaderPresent() != null && tableDefinition.getColumnDelimiter() != null && tableDefinition.getLineDelimiter() != null) {
|
insertDataFromCsvFileToDatabase(metaData, null, tableDefinition);
|
} else {
|
insertDataFromFileToDatabase(metaData, null, tableDefinition.getColumnDefinitions());
|
}
|
dataProduction.setStatutDataProduction(StatutDataProduction.A_CONSOMMER);
|
new UpdateDataProductionQuery(dataProduction, connection).execute();
|
|
FdxConsultationTable consTable = FdxConsultationTableFactory.createTable(dataProduction.getCodeTypeFichier(), dataProduction.getReferentielVersion(), connection);
|
|
if (consTable.tableDefinition(connection).isReferentiel()) {
|
if (dataProduction.getId() != null) {
|
new InsertDataProductionInDataProductionToDeleteQuery(connection, dataProduction.getId()).execute();
|
}
|
}
|
}
|
} else {
|
dataProduction.setStatutDataProduction(StatutDataProduction.CONSOMME);
|
new UpdateDataProductionQuery(dataProduction, connection).execute();
|
}
|
|
}
|
|
private void insertDataFromFileToDatabase(ProductionMetaData metaData, List<Long> indexes, List<ColumnDefinition> columnDefinitions) throws BadDataValueException, Exception, IOException {
|
List<FdxTableRow> data = new ArrayList<>(BATCH_SIZE);
|
List<FdxTableColumnData> appColumns = appColumns();
|
final int lineLength = columnDefinitions.stream().map(c -> c.getTaille()).reduce(0, Integer::sum);
|
int lineNumber = 1;
|
String line;
|
final Long startIndex = metaData.getStartIndex();
|
int index = lineNumber - 1;
|
Long fdxIndex = metaData.getDataProductionType().equals(DataProductionType.ADD.toString()) ? startIndex : indexes.get(index);
|
|
System.out.println("------------------startIndex = " + startIndex + ", nbreElts = " + metaData.getNbreElts());
|
|
try ( FileInputStream fis = new FileInputStream(dataFile); InputStreamReader isr = new InputStreamReader(fis, charset); BufferedReader reader = new BufferedReader(isr)) {
|
while ((line = reader.readLine()) != null) {
|
|
if (line.trim().isEmpty() || line.length() != lineLength) {
|
throw new BadDataValueException("La taille de la ligne " + lineNumber + " est différente de la taille attendue");
|
}
|
data.add(getLine(line, lineNumber, columnDefinitions, appColumns, fdxIndex));
|
|
if ((lineNumber % BATCH_SIZE) == 0) {
|
insert(data);
|
}
|
|
lineNumber++;
|
index = lineNumber - 1;
|
fdxIndex = metaData.getDataProductionType().equals(DataProductionType.ADD.toString()) ? startIndex + index : indexes.get(index);
|
}
|
|
dataProduction.setStatutDataProduction(StatutDataProduction.A_CONSOMMER);
|
insert(data);
|
}
|
System.out.println("-----------LastIndex = " + fdxIndex);
|
}
|
|
private int insertDataFromCsvFileToDatabase(ProductionMetaData metaData, List<Long> indexes, TableDefinition tableDefinition) throws BadDataValueException, Exception, IOException {
|
List<FdxTableRow> data = new ArrayList<>(BATCH_SIZE);
|
List<FdxTableColumnData> appColumns = appColumns();
|
int lineNumber = 1;
|
final Long startIndex = metaData.getStartIndex();
|
int index = lineNumber - 1;
|
Long fdxIndex = metaData.getDataProductionType().equals(DataProductionType.ADD.toString()) ? startIndex : indexes.get(index);
|
|
try ( FileInputStream fis = new FileInputStream(dataFile); InputStreamReader isr = new InputStreamReader(fis, charset); BufferedReader reader = new BufferedReader(isr)) {
|
List<ColumnDefinition> columnDefinitions = tableDefinition.getColumnDefinitions();
|
AtomicBoolean withHeader = new AtomicBoolean(tableDefinition.getHeaderPresent());
|
|
CSVParser parser = new CSVParser(reader,
|
CSVFormat.DEFAULT.builder()
|
.setSkipHeaderRecord(withHeader.get())
|
.setIgnoreSurroundingSpaces(true)
|
.setTrim(true)
|
.setDelimiter(StringEscapeUtils.unescapeJava(tableDefinition.getColumnDelimiter()))
|
.setRecordSeparator(StringEscapeUtils.unescapeJava(tableDefinition.getLineDelimiter()))
|
.build());
|
List<CSVRecord> records = parser.getRecords();
|
|
for (CSVRecord record : records) {
|
|
String[] columnsTableLine = new String[record.size()];
|
AtomicInteger i = new AtomicInteger(0);
|
|
if (!withHeader.get()) {
|
record.forEach(column -> {
|
columnsTableLine[i.getAndIncrement()] = column != null ? column.replaceAll(" {2,}", " ").replaceAll("\r", "").replaceAll("\n", "").replaceAll("\t", "") : "";
|
});
|
if (columnsTableLine.length != columnDefinitions.size()) {
|
throw new Exception("Le nombre de colonnes de la ligne " + lineNumber + " est différent du nombre de colonnes attendu");
|
}
|
data.add(getLine(columnsTableLine, lineNumber, columnDefinitions, appColumns, fdxIndex));
|
|
if ((lineNumber % BATCH_SIZE) == 0) {
|
insert(data);
|
}
|
lineNumber++;
|
} else {
|
withHeader.set(false);
|
}
|
index = lineNumber - 1;
|
fdxIndex = metaData.getDataProductionType().equals(DataProductionType.ADD.toString()) ? startIndex + index : indexes.get(index);
|
}
|
}
|
|
if (!data.isEmpty()) {
|
insert(data);
|
}
|
return lineNumber - 1;
|
}
|
|
private void insert(List<FdxTableRow> data) throws Exception {
|
tableInsert.insert(dataProduction, data, connection);
|
data.clear();
|
}
|
|
private TableDefinition tableDefinition(String codeTypeFichier, String referentielVersion) throws SQLException, NamingException {
|
FdxConsultationTable consTable = FdxConsultationTableFactory.createTable(codeTypeFichier, referentielVersion, connection);
|
return consTable.tableDefinition(connection).tableDefinition();
|
}
|
|
private FdxTableRow getLine(final String line, final int lineNumber, final List<ColumnDefinition> columnsDefinition, List<FdxTableColumnData> appColumns, Long fdxIndex) throws BadDataValueException, LocalDateTimeValueParseError {
|
int index = 0;
|
List<FdxTableColumnData> dataColumns = new ArrayList<>();
|
|
for (ColumnDefinition c : columnsDefinition) {
|
//traitement des colonnes du type fichier
|
String data = line.substring(index, index + c.getTaille());
|
FdxTableColumnData columnData = new FdxTableColumnData(parse(c, data), c.getName(), c.getPosition());
|
dataColumns.add(columnData);
|
index += c.getTaille();
|
}
|
dataColumns.addAll(appColumns);
|
dataColumns.add(new FdxTableColumnData(fdxIndex, new FdxConsultationIndexColumnDefinition().name(), 0));
|
|
return new FdxTableRow(lineNumber, dataColumns);
|
}
|
|
private FdxTableRow getLine(String[] columnsTableLine, final int lineNumber, final List<ColumnDefinition> columnsDefinition, List<FdxTableColumnData> appColumns, Long fdxIndex) throws BadDataValueException, LocalDateTimeValueParseError {
|
List<FdxTableColumnData> dataColumns = new ArrayList<>();
|
|
for (ColumnDefinition c : columnsDefinition) {
|
//traitement des colonnes du type fichier
|
String data = columnsTableLine[c.getPosition() - 1];
|
FdxTableColumnData columnData = new FdxTableColumnData(parse(c, data), c.getName(), c.getPosition());
|
dataColumns.add(columnData);
|
}
|
dataColumns.addAll(appColumns);
|
dataColumns.add(new FdxTableColumnData(fdxIndex, new FdxConsultationIndexColumnDefinition().name(), 0));
|
|
return new FdxTableRow(lineNumber, dataColumns);
|
}
|
|
private List<FdxTableColumnData> appColumns() {
|
List<FdxTableColumnData> dataColumns = new ArrayList<>();
|
|
if (dataProduction.getDataProductionType().equals(DataProductionType.UPDATE)) {
|
dataColumns.add(new FdxTableColumnData(dataProduction.getDateMiseAJour(), new DateMiseAJourColumnDefinition().name(), 0));
|
dataColumns.add(new FdxTableColumnData(dataProduction.getId(), new DataProductionUpdateIdColumnDefinition().name(), 0));
|
|
} else if (dataProduction.getDataProductionType().equals(DataProductionType.ADD)) {
|
dataColumns.add(new FdxTableColumnData(dataProduction.getId(), new DataProductionIdColumnDefinition().name(), 0));
|
}
|
dataColumns.add(new FdxTableColumnData(dataProduction.getDateProduction(), new DateCreationColumnDefinition().name(), 0));
|
return dataColumns;
|
}
|
|
private Object parse(ColumnDefinition columnDefinition, String value) throws BadDataValueException, LocalDateTimeValueParseError {
|
|
String columnType = columnDefinition.getTypeDonnee();
|
|
if (columnType.equals(TypeDonnee.ALPHANUMERIQUE.toString()) && (value == null
|
|| (value instanceof String && lengthLowerOrEquals(((String) value).trim(), columnDefinition.getTaille())))) {
|
|
return value != null ? value.trim() : value;
|
|
} else if (columnType.equals(TypeDonnee.NUMERIQUE.toString())) {
|
|
if (value == null || value.trim().isEmpty()) {
|
return null;
|
}
|
|
String stringValue = value.trim()
|
.replaceAll("\\s+", "")
|
.replaceAll("\\u00A0", "")//nbsp
|
.replaceAll(" ", "");
|
if (lengthLowerOrEquals(stringValue, columnDefinition.getTaille()) && GenericValidator.isLong(stringValue)) {
|
return Long.valueOf(stringValue);
|
}
|
|
} else if (columnType.equals(TypeDonnee.DECIMAL.toString())) {
|
|
if (value == null || value.trim().isEmpty()) {
|
return null;
|
}
|
String stringValue = value
|
.trim()
|
.replaceAll("\\s+", "")
|
.replaceAll("\\u00A0", "")//nbsp
|
.replaceAll(" ", "");
|
|
if (lengthLowerOrEquals(stringValue, columnDefinition.getTaille() + columnDefinition.getTaillePartieDecimale() + 1)) {
|
try {
|
return new BigDecimal(stringValue);
|
} catch (Exception ex) {
|
}
|
}
|
|
} else if (columnType.equals(TypeDonnee.DATE.toString())) {
|
|
if (value == null || value.trim().isEmpty()) {
|
return null;
|
}
|
String stringValue = value.trim()
|
.replaceAll("\\s+", "")
|
.replaceAll("\\u00A0", "")//nbsp
|
.replaceAll(" ", "");
|
|
if (lengthEquals(stringValue, columnDefinition.getTaille()) && GenericValidator.isDate(stringValue, columnDefinition.getFormatDate(), true)) {
|
return new InsertionLocalDateTimeValue(columnDefinition.getFormatDate(), value).value();
|
}
|
}
|
|
throw new BadDataValueException("La valeur " + value + " ne correspond pas au format attendu pour la colonne " + columnDefinition.getName());
|
}
|
|
private boolean lengthEquals(String value, int columnMaxLength) {
|
return value.trim().length() == columnMaxLength;
|
}
|
|
private boolean lengthLowerOrEquals(String value, int columnMaxLength) {
|
return value.trim().length() <= columnMaxLength;
|
}
|
}
|