*/
package fr.gouv.finances.dgfip.xemelios.batch.imports;
-import fr.gouv.finances.dgfip.xemelios.batch.Batch;
-import fr.gouv.finances.dgfip.xemelios.batch.BatchRunner;
-import fr.gouv.finances.dgfip.xemelios.batch.imports.utils.ImportVOFull;
-import fr.gouv.finances.dgfip.xemelios.batch.imports.utils.ImportVOPK;
-import fr.gouv.finances.dgfip.xemelios.batch.imports.utils.ImportVOWrapper;
-import fr.gouv.finances.dgfip.xemelios.batch.imports.utils.XemImportsFilesVOFull;
-import fr.gouv.finances.dgfip.xemelios.batch.imports.utils.XemImportsFilesVOWrapper;
-import fr.gouv.finances.dgfip.xemelios.common.Constants;
-import fr.gouv.finances.dgfip.xemelios.common.PJRef;
-import fr.gouv.finances.dgfip.xemelios.common.config.DocumentsModel;
-import fr.gouv.finances.dgfip.xemelios.data.impl.pool.PoolManager;
-import fr.gouv.finances.dgfip.xemelios.utils.FileUtils;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.util.zip.ZipEntry;
import java.util.zip.ZipException;
import java.util.zip.ZipFile;
+
import org.apache.log4j.Logger;
+import fr.gouv.finances.dgfip.xemelios.batch.Batch;
+import fr.gouv.finances.dgfip.xemelios.batch.BatchRunner;
+import fr.gouv.finances.dgfip.xemelios.batch.imports.utils.ImportVOFull;
+import fr.gouv.finances.dgfip.xemelios.batch.imports.utils.ImportVOPK;
+import fr.gouv.finances.dgfip.xemelios.batch.imports.utils.ImportVOWrapper;
+import fr.gouv.finances.dgfip.xemelios.batch.imports.utils.XemImportsFilesVOFull;
+import fr.gouv.finances.dgfip.xemelios.batch.imports.utils.XemImportsFilesVOWrapper;
+import fr.gouv.finances.dgfip.xemelios.common.Constants;
+import fr.gouv.finances.dgfip.xemelios.common.PJRef;
+import fr.gouv.finances.dgfip.xemelios.common.config.DocumentsModel;
+import fr.gouv.finances.dgfip.xemelios.data.impl.pool.PoolManager;
+import fr.gouv.finances.dgfip.xemelios.utils.FileUtils;
+
/**
* Ce batch vérifie que toutes les conditions sont bien remplies pour réaliser un import
+ *
* @author chm
*/
public class ImportChecker extends Batch {
- private final static Logger logger = Logger.getLogger(ImportChecker.class);
- public static final int MAX_CONCURRENT_THREADS = 3;
- private int maxConcurrentThreads = MAX_CONCURRENT_THREADS;
-
- private String importDirectory = null;
+ private final static Logger logger = Logger.getLogger(ImportChecker.class);
+ public static final int MAX_CONCURRENT_THREADS = 3;
+ private int maxConcurrentThreads = MAX_CONCURRENT_THREADS;
+
+ private String importDirectory = null;
+
+ private ThreadPool<ImportCheckerImpl> pool = null;
+
+ public ImportChecker(final String[] args) {
+ super(args);
+ }
+
+ @Override
+ protected void initialize() throws Exception {
+ super.initialize();
+ final String sTmp = this.getProps().getProperty("max.concurrent.threads");
+ if (sTmp != null && sTmp.length() > 0) {
+ try {
+ this.maxConcurrentThreads = Integer.parseInt(sTmp);
+ } catch (final Throwable t) {
+ }
+ }
+ }
+
+ @Override
+ protected void doProcess() throws Exception {
+ Connection con = null;
+ Statement st = null;
+ ResultSet rs = null;
+ final ArrayList<ImportCheckerImpl> threads = new ArrayList<ImportCheckerImpl>();
+ try {
+ con = PoolManager.getInstance().getConnection();
+ final String sqlParam = "SELECT PARAM_VALUE FROM PARAMETERS WHERE PARAM_NAME='import.directory'";
+ st = con.createStatement();
+ rs = st.executeQuery(sqlParam);
+ if (rs.next()) {
+ this.importDirectory = rs.getString(1);
+ rs.close();
+ } else {
+ throw new Exception("parameter 'import.directory' not set !");
+ }
+ final String sql = "SELECT IMPORT_ID FROM XEM_IMPORTS WHERE STATUS=0 AND CLEANED IS NULL";
+ rs = st.executeQuery(sql);
+ while (rs.next()) {
+ threads.add(new ImportCheckerImpl(rs.getString(1), this.importDirectory, BatchRunner.getInstance().getDocuments()));
+ }
+ PoolManager.getInstance().releaseConnection(con);
+ con = null;
+ this.pool = new ThreadPool<ImportCheckerImpl>(threads);
+ this.pool.start(this.maxConcurrentThreads);
+ } catch (final SQLException sqlEx) {
+ // TODO
+ } finally {
+ if (con != null) {
+ PoolManager.getInstance().releaseConnection(con);
+ }
+ }
+ }
+
+ @Override
+ public String getResumeTraitement() {
+ return "import.checker resume";
+ }
- private ThreadPool<ImportCheckerImpl> pool = null;
+ @Override
+ public String typeTraitementRefCode() {
+ return "import.checker";
+ }
- public ImportChecker(String[] args) {
- super(args);
- }
+ @Override
+ public String getInformations() {
+ final StringBuilder sb = new StringBuilder();
+ if (this.isStarted()) {
+ sb.append(this.typeTraitementRefCode()).append(" - ").append(this.pool.getRunningThreadCount()).append(" running thread(s)\n");
+ for (final RunnableBatch rb : this.pool.getThreads()) {
+ sb.append("\t").append(rb.getName()).append(": ").append(rb.isAlive() ? "running" : "idle").append("\n");
+ if (rb.isAlive()) {
+ sb.append(rb.getInformations());
+ }
+ }
+ } else {
+ sb.append(this.typeTraitementRefCode()).append(" - not started\n");
+ }
+ return sb.toString();
+ }
- @Override
- protected void initialize() throws Exception {
- super.initialize();
- String sTmp = getProps().getProperty("max.concurrent.threads");
- if(sTmp!=null && sTmp.length()>0) {
- try {
- maxConcurrentThreads = Integer.parseInt(sTmp);
- } catch(Throwable t) {}
- }
- }
+ @Override
+ protected String getBatchVersion() {
+ return "1.0";
+ }
- @Override
- protected void doProcess() throws Exception {
- Connection con = null;
- Statement st = null;
- ResultSet rs = null;
- ArrayList<ImportCheckerImpl> threads = new ArrayList<ImportCheckerImpl>();
- try {
- con = PoolManager.getInstance().getConnection();
- String sqlParam = "SELECT PARAM_VALUE FROM PARAMETERS WHERE PARAM_NAME='import.directory'";
- st = con.createStatement();
- rs = st.executeQuery(sqlParam);
- if(rs.next()) {
- importDirectory = rs.getString(1);
- rs.close();
- } else {
- throw new Exception("parameter 'import.directory' not set !");
- }
- String sql = "SELECT IMPORT_ID FROM XEM_IMPORTS WHERE STATUS=0 AND CLEANED IS NULL";
- rs = st.executeQuery(sql);
- while(rs.next()) {
- threads.add(new ImportCheckerImpl(rs.getString(1),importDirectory,BatchRunner.getInstance().getDocuments()));
- }
- PoolManager.getInstance().releaseConnection(con); con=null;
- pool = new ThreadPool<ImportCheckerImpl>(threads);
- pool.start(maxConcurrentThreads);
- } catch(SQLException sqlEx) {
- // TODO
- } finally {
- if(con!=null) PoolManager.getInstance().releaseConnection(con);
- }
- }
+ protected class ImportCheckerImpl extends RunnableBatch {
+ private final Logger logger = Logger.getLogger(ImportCheckerImpl.class);
+ private final String importId;
+ private ImportVOFull importFull;
+ private List<XemImportsFilesVOFull> files;
+ // utilities
+ private final File importDirectory;
+ private final DocumentsModel documents;
- @Override
- public String getResumeTraitement() {
- return "import.checker resume";
- }
+ public ImportCheckerImpl(final String importId, final String importDirectory, final DocumentsModel documents) {
+ this.setName("importChecker<" + importId + ">");
+ this.importId = importId;
+ this.importDirectory = new File(importDirectory);
+ this.documents = documents;
+ this.setName("ImportCheckerImpl<" + importId + ">");
+ }
- @Override
- public String typeTraitementRefCode() {
- return "import.checker";
- }
+ @Override
+ @SuppressWarnings("unchecked")
+ public void run() {
+ Connection con = null;
+ try {
+ con = PoolManager.getInstance().getConnection();
+ this.loadImportInfos(con);
+ this.importFull.setStatus(1);
+ ImportVOWrapper.updateImportVO(con, this.importFull);
+ PoolManager.getInstance().releaseConnection(con);
+ con = null;
+ this.checkFile();
+ con = PoolManager.getInstance().getConnection();
+ ImportVOWrapper.updateImportVO(con, this.importFull);
+ } catch (final SQLException sqlEx) {
+ this.logger.error("run<" + this.importId + ">()", sqlEx);
+ } finally {
+ if (con != null) {
+ PoolManager.getInstance().releaseConnection(con);
+ }
+ this.getNotifier().notifyThreadStop(this);
+ }
+ }
- @Override
- public String getInformations() {
- StringBuilder sb = new StringBuilder();
- if(isStarted()) {
- sb.append(typeTraitementRefCode()).append(" - ").append(pool.getRunningThreadCount()).append(" running thread(s)\n");
- for(RunnableBatch rb: pool.getThreads()) {
- sb.append("\t").append(rb.getName()).append(": ").append(rb.isAlive()?"running":"idle").append("\n");
- if(rb.isAlive()) sb.append(rb.getInformations());
- }
- } else {
- sb.append(typeTraitementRefCode()).append(" - not started\n");
- }
- return sb.toString();
- }
+ protected boolean checkFile() {
+ final File[] ret = new File[1];
+ ret[0] = new File(new File(this.importDirectory, this.importId), this.importFull.getFileName());
+ final ImportContent ic = new ImportContent();
+ for (final File element : ret) {
+ if (element.getName().toLowerCase().endsWith(".zip")) {
+ if (element.exists()) {
+ ZipFile zf = null;
+ try {
+ zf = new ZipFile(element);
+ for (final Enumeration<? extends ZipEntry> enumer = zf.entries(); enumer.hasMoreElements();) {
+ final ZipEntry ze = enumer.nextElement();
+ if (!ze.isDirectory()) {
+ String fileName = ze.getName();
+ final String entryName = fileName.toLowerCase();
+ fileName = fileName.replace(File.pathSeparatorChar, '_').replace(File.separatorChar, '_').replace(':', '|').replace('\'', '_').replace('/', '_');
+ this.logger.debug(entryName);
+ if (PJRef.isPJ(ze)) {
+ final PJRef pj = new PJRef(ze);
+ pj.writeTmpFile(FileUtils.getTempDir(), zf);
+ ic.pjs.add(pj);
+ } else if ((entryName.endsWith(this.documents.getDocumentById(this.importFull.getFileType()).getExtension().toLowerCase()) || entryName.endsWith(".xml")) && !fileName.startsWith("_")) {
+ // on decompresse le fichier dans le
+ // repertoire temporaire, comme ca il sera
+ // supprime en quittant
+ final InputStream is = zf.getInputStream(ze);
+ final BufferedInputStream bis = new BufferedInputStream(is);
+ final File output = new File(FileUtils.getTempDir(), fileName);
+ final BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(output));
+ final byte[] buffer = new byte[1024];
+ int read = bis.read(buffer);
+ while (read > 0) {
+ bos.write(buffer, 0, read);
+ read = bis.read(buffer);
+ }
+ bos.flush();
+ bos.close();
+ bis.close();
+ ic.filesToImport.add(output);
+ }
+ }
+ }
+ zf.close();
+ } catch (final ZipException zEx) {
+ final String errorMessage = "Le fichier " + element.getName() + " n'est pas une archive ZIP valide";
+ this.importFull.setErrorMessage(errorMessage);
+ } catch (final IOException ioEx) {
+ final String errorMessage = "Le fichier " + element.getName() + " est illisible. Vérifiez que les noms des fichiers\ncontenus dans l'archive ne comportent pas de caractères spéciaux ou accentués.";
+ this.importFull.setErrorMessage(errorMessage);
+ } finally {
+ if (zf != null) {
+ try {
+ zf.close();
+ } catch (final Throwable t) {
+ }
+ }
+ }
+ }
+ } else if (element.getName().toLowerCase().endsWith(".gz")) {
+ try {
+ String fileName = element.getName();
+ fileName = fileName.substring(0, fileName.length() - 3);
+ final File output = new File(FileUtils.getTempDir(), fileName);
+ final GZIPInputStream gis = new GZIPInputStream(new FileInputStream(element));
+ final BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(output));
+ final byte[] buffer = new byte[1024];
+ int read = gis.read(buffer);
+ while (read > 0) {
+ bos.write(buffer, 0, read);
+ read = gis.read(buffer);
+ }
+ bos.flush();
+ bos.close();
+ gis.close();
+ ic.filesToImport.add(output);
+ } catch (final IOException ioEx) {
+ // nothing to do
+ }
+ } else {
+ ic.filesToImport.add(element);
+ }
+ }
+ if (ic.filesToImport.size() == 1) {
+ if (Constants.XEMELIOS_ARCHIVE_SIGN.equals(this.importFull.getFileType())) {
+ // Traitement des archives
+ this.importFull.setStatus(6);
+ } else {
+ // Traitement des autres fichiers
+ boolean needToLookForCollBudg = false;
+ final StringBuffer errors = new StringBuffer();
+ if (this.importFull.getCollectivite() == null || this.importFull.getCollectivite().length() == 0) {
+ needToLookForCollBudg = true;
+ }
+ if (this.importFull.getCollectiviteLib() == null || this.importFull.getCollectiviteLib().length() == 0) {
+ needToLookForCollBudg = true;
+ }
+ if (this.importFull.getBudget() == null || this.importFull.getBudget().length() == 0) {
+ needToLookForCollBudg = true;
+ }
+ if (this.importFull.getBudgetLib() == null || this.importFull.getBudgetLib().length() == 0) {
+ needToLookForCollBudg = true;
+ }
+ if (needToLookForCollBudg) {
+ final BudgCollParser bcp = new BudgCollParser(ic.filesToImport.get(0), this.documents.getDocumentById(this.importFull.getFileType()));
+ bcp.investigate();
+ if (bcp.isInError()) {
+ this.importFull.setErrorMessage(bcp.getErrorMessage());
+ this.importFull.setStatus(5);
+ } else {
+ if (this.importFull.getCollectivite() == null || this.importFull.getCollectivite().length() == 0) {
+ this.importFull.setCollectivite(bcp.getCollectivite());
+ }
+ if (this.importFull.getCollectiviteLib() == null || this.importFull.getCollectiviteLib().length() == 0) {
+ this.importFull.setCollectiviteLib(bcp.getCollectiviteLib());
+ }
+ if (this.importFull.getBudget() == null || this.importFull.getBudget().length() == 0) {
+ this.importFull.setBudget(bcp.getBudget());
+ }
+ if (this.importFull.getBudgetLib() == null || this.importFull.getBudgetLib().length() == 0) {
+ this.importFull.setBudgetLib(bcp.getBudgetLib());
+ }
+ this.importFull.determinateStatus();
+ }
+ }
+ }
+ } else {
+ // TODO: multi sub files
+ }
+ // TODO
+ return false;
+ }
- @Override
- protected String getBatchVersion() {
- return "1.0";
- }
+ @SuppressWarnings("unchecked")
+ public void loadImportInfos(final Connection con) throws SQLException {
+ final ImportVOPK pk = new ImportVOPK(this.importId);
+ this.importFull = ImportVOWrapper.getImportVOFullByPk(con, pk);
+ this.files = XemImportsFilesVOWrapper.getAllXemImportsFilesVOFullBy(con, XemImportsFilesVOFull.class, "SELECT " + XemImportsFilesVOWrapper.getSelectFieldsClause() + " FROM XEM_IMPORTS_FILES WHERE IMPORT_ID='" + this.importId + "'");
+ }
- protected class ImportCheckerImpl extends RunnableBatch {
- private final Logger logger = Logger.getLogger(ImportCheckerImpl.class);
- private String importId;
- private ImportVOFull importFull;
- private List<XemImportsFilesVOFull> files;
- // utilities
- private File importDirectory;
- private DocumentsModel documents;
- public ImportCheckerImpl(String importId,String importDirectory, DocumentsModel documents) {
- setName("importChecker<"+importId+">");
- this.importId=importId;
- this.importDirectory = new File(importDirectory);
- this.documents=documents;
- setName("ImportCheckerImpl<"+importId+">");
- }
+ @Override
+ public String getInformations() {
+ if (this.isAlive()) {
+ return "\t".concat(this.getName()).concat(": running\n");
+ } else {
+ return "\t".concat(this.getName()).concat(": idle\n");
+ }
+ }
+ }
- @Override
- @SuppressWarnings("unchecked")
- public void run() {
- Connection con = null;
- try {
- con = PoolManager.getInstance().getConnection();
- loadImportInfos(con);
- importFull.setStatus(1);
- ImportVOWrapper.updateImportVO(con, importFull);
- PoolManager.getInstance().releaseConnection(con); con=null;
- checkFile();
- con = PoolManager.getInstance().getConnection();
- ImportVOWrapper.updateImportVO(con, importFull);
- } catch(SQLException sqlEx) {
- logger.error("run<"+importId+">()", sqlEx);
- } finally {
- if(con!=null) PoolManager.getInstance().releaseConnection(con);
- getNotifier().notifyThreadStop(this);
- }
- }
+ public static class ImportContent {
+ ArrayList<File> filesToImport;
+ ArrayList<PJRef> pjs;
- protected boolean checkFile() {
- File[] ret = new File[1];
- ret[0] = new File(new File(importDirectory,importId),importFull.getFileName());
- ImportContent ic = new ImportContent();
- for (int i = 0; i < ret.length; i++) {
- if (ret[i].getName().toLowerCase().endsWith(".zip")) {
- if (ret[i].exists()) {
- ZipFile zf = null;
- try {
- zf = new ZipFile(ret[i]);
- for (Enumeration<? extends ZipEntry> enumer = zf.entries(); enumer.hasMoreElements();) {
- ZipEntry ze = enumer.nextElement();
- if (!ze.isDirectory()) {
- String fileName = ze.getName();
- String entryName = fileName.toLowerCase();
- fileName = fileName.replace(
- File.pathSeparatorChar, '_').replace(
- File.separatorChar, '_').replace(':',
- '|').replace('\'', '_').replace('/',
- '_');
- logger.debug(entryName);
- if(PJRef.isPJ(ze)) {
- PJRef pj = new PJRef(ze);
- pj.writeTmpFile(FileUtils.getTempDir(),zf);
- ic.pjs.add(pj);
- } else if ((entryName.endsWith(documents.getDocumentById(importFull.getFileType()).getExtension().toLowerCase()) || entryName.endsWith(".xml")) && !fileName.startsWith("_")) {
- // on decompresse le fichier dans le
- // repertoire temporaire, comme ca il sera
- // supprime en quittant
- InputStream is = zf.getInputStream(ze);
- BufferedInputStream bis = new BufferedInputStream(is);
- File output = new File(FileUtils.getTempDir(), fileName);
- BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(output));
- byte[] buffer = new byte[1024];
- int read = bis.read(buffer);
- while (read > 0) {
- bos.write(buffer, 0, read);
- read = bis.read(buffer);
- }
- bos.flush();
- bos.close();
- bis.close();
- ic.filesToImport.add(output);
- }
- }
- }
- zf.close();
- } catch (ZipException zEx) {
- String errorMessage = "Le fichier "+ret[i].getName()+ " n'est pas une archive ZIP valide";
- importFull.setErrorMessage(errorMessage);
- } catch (IOException ioEx) {
- String errorMessage = "Le fichier "+ret[i].getName()+" est illisible. Vérifiez que les noms des fichiers\ncontenus dans l'archive ne comportent pas de caractères spéciaux ou accentués.";
- importFull.setErrorMessage(errorMessage);
- } finally {
- if(zf!=null) {
- try { zf.close(); } catch(Throwable t) {}
- }
- }
- }
- } else if(ret[i].getName().toLowerCase().endsWith(".gz")) {
- try {
- String fileName = ret[i].getName();
- fileName = fileName.substring(0, fileName.length()-3);
- File output = new File(FileUtils.getTempDir(), fileName);
- GZIPInputStream gis = new GZIPInputStream(new FileInputStream(ret[i]));
- BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(output));
- byte[] buffer = new byte[1024];
- int read = gis.read(buffer);
- while (read > 0) {
- bos.write(buffer, 0, read);
- read = gis.read(buffer);
- }
- bos.flush();
- bos.close();
- gis.close();
- ic.filesToImport.add(output);
- } catch(IOException ioEx) {
- // nothing to do
- }
- } else {
- ic.filesToImport.add(ret[i]);
- }
- }
- if(ic.filesToImport.size()==1) {
- if(Constants.XEMELIOS_ARCHIVE_SIGN.equals(importFull.getFileType())) {
- // Traitement des archives
- importFull.setStatus(6);
- } else {
- // Traitement des autres fichiers
- boolean needToLookForCollBudg = false;
- StringBuffer errors = new StringBuffer();
- if(importFull.getCollectivite()==null || importFull.getCollectivite().length()==0) {
- needToLookForCollBudg = true;
- }
- if(importFull.getCollectiviteLib()==null || importFull.getCollectiviteLib().length()==0) {
- needToLookForCollBudg = true;
- }
- if(importFull.getBudget()==null || importFull.getBudget().length()==0) {
- needToLookForCollBudg = true;
- }
- if(importFull.getBudgetLib()==null || importFull.getBudgetLib().length()==0) {
- needToLookForCollBudg = true;
- }
- if(needToLookForCollBudg) {
- BudgCollParser bcp = new BudgCollParser(ic.filesToImport.get(0), documents.getDocumentById(importFull.getFileType()));
- bcp.investigate();
- if(bcp.isInError()) {
- importFull.setErrorMessage(bcp.getErrorMessage());
- importFull.setStatus(5);
- } else {
- if(importFull.getCollectivite()==null || importFull.getCollectivite().length()==0) {
- importFull.setCollectivite(bcp.getCollectivite());
- }
- if(importFull.getCollectiviteLib()==null || importFull.getCollectiviteLib().length()==0) {
- importFull.setCollectiviteLib(bcp.getCollectiviteLib());
- }
- if(importFull.getBudget()==null || importFull.getBudget().length()==0) {
- importFull.setBudget(bcp.getBudget());
- }
- if(importFull.getBudgetLib()==null || importFull.getBudgetLib().length()==0) {
- importFull.setBudgetLib(bcp.getBudgetLib());
- }
- importFull.determinateStatus();
- }
- }
- }
- } else {
- // TODO: multi sub files
- }
- // TODO
- return false;
- }
- @SuppressWarnings("unchecked")
- public void loadImportInfos(Connection con) throws SQLException {
- ImportVOPK pk = new ImportVOPK(importId);
- importFull = ImportVOWrapper.getImportVOFullByPk(con, pk);
- files = XemImportsFilesVOWrapper.getAllXemImportsFilesVOFullBy(con, XemImportsFilesVOFull.class, "SELECT "+XemImportsFilesVOWrapper.getSelectFieldsClause()+" FROM XEM_IMPORTS_FILES WHERE IMPORT_ID='"+importId+"'");
- }
+ public ImportContent() {
+ super();
+ this.filesToImport = new ArrayList<File>();
+ this.pjs = new ArrayList<PJRef>();
+ }
- @Override
- public String getInformations() {
- if(isAlive()) return "\t".concat(getName()).concat(": running\n");
- else return "\t".concat(getName()).concat(": idle\n");
- }
- }
- public static class ImportContent {
- ArrayList<File> filesToImport;
- ArrayList<PJRef> pjs;
- public ImportContent() {
- super();
- filesToImport = new ArrayList<File>();
- pjs = new ArrayList<PJRef>();
- }
- public void setFilesToImport(ArrayList<File> array){
- this.filesToImport=array;
- }
- }
+ public void setFilesToImport(final ArrayList<File> array) {
+ this.filesToImport = array;
+ }
+ }
}