*/
package fr.gouv.finances.dgfip.xemelios.batch.imports;
-import fr.gouv.finances.dgfip.xemelios.batch.Batch;
-import fr.gouv.finances.dgfip.xemelios.batch.imports.utils.ImportVOFull;
-import fr.gouv.finances.dgfip.xemelios.batch.imports.utils.ImportVOPK;
-import fr.gouv.finances.dgfip.xemelios.batch.imports.utils.ImportVOWrapper;
-import fr.gouv.finances.dgfip.xemelios.data.impl.pool.PoolManager;
-import fr.gouv.finances.dgfip.xemelios.data.utils.jdbc.JdbcUtils;
-import fr.gouv.finances.dgfip.xemelios.data.utils.jdbc.PStmtBinder;
import java.io.File;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
+
import org.apache.log4j.Logger;
+import fr.gouv.finances.dgfip.xemelios.batch.Batch;
+import fr.gouv.finances.dgfip.xemelios.batch.imports.utils.ImportVOFull;
+import fr.gouv.finances.dgfip.xemelios.batch.imports.utils.ImportVOPK;
+import fr.gouv.finances.dgfip.xemelios.batch.imports.utils.ImportVOWrapper;
+import fr.gouv.finances.dgfip.xemelios.data.impl.pool.PoolManager;
+import fr.gouv.finances.dgfip.xemelios.data.utils.jdbc.JdbcUtils;
+import fr.gouv.finances.dgfip.xemelios.data.utils.jdbc.PStmtBinder;
+
/**
- *
+ *
* @author chm
*/
public class PostImportCleaner extends Batch {
- public static final int MAX_CONCURRENT_THREADS = 3;
- private int maxConcurrentThreads = MAX_CONCURRENT_THREADS;
- private String importDirectory = null;
-
- private ThreadPool<PostImportCleanerImpl> pool = null;
-
- public PostImportCleaner(String args[]) {
- super(args);
- }
-
- @Override
- protected void initialize() throws Exception {
- super.initialize();
- String sTmp = getProps().getProperty("max.concurrent.threads");
- if(sTmp!=null && sTmp.length()>0) {
- try {
- maxConcurrentThreads = Integer.parseInt(sTmp);
- } catch(Throwable t) {}
- }
- }
-
-
-
- @Override
- protected void doProcess() throws Exception {
- Connection con = null;
- Statement st = null;
- ResultSet rs = null;
- ArrayList<PostImportCleanerImpl> threads = new ArrayList<PostImportCleanerImpl>();
- try {
- con = PoolManager.getInstance().getConnection();
- String sqlParam = "SELECT PARAM_VALUE FROM PARAMETERS WHERE PARAM_NAME='import.directory'";
- st = con.createStatement();
- rs = st.executeQuery(sqlParam);
- if(rs.next()) {
- importDirectory = rs.getString(1);
- rs.close();
- } else {
- throw new Exception("parameter 'import.directory' not set !");
- }
- String sql = "SELECT IMPORT_ID FROM XEM_IMPORTS WHERE STATUS IN (4,5) AND CLEANED IS NULL";
- rs = st.executeQuery(sql);
- while(rs.next()) {
- threads.add(new PostImportCleanerImpl(rs.getString(1),importDirectory));
- }
- PoolManager.getInstance().releaseConnection(con); con=null;
- pool = new ThreadPool<PostImportCleanerImpl>(threads);
- pool.start(maxConcurrentThreads);
- } catch(SQLException sqlEx) {
- // TODO
- } finally {
- if(con!=null) PoolManager.getInstance().releaseConnection(con);
- }
- }
-
- @Override
- public String getResumeTraitement() {
- return "post-import.cleaner.resume";
- }
-
- @Override
- public String typeTraitementRefCode() {
- return "post-import.cleaner";
-
- }
-
- @Override
- public String getInformations() {
- StringBuilder sb = new StringBuilder();
- if(isStarted()) {
- sb.append(typeTraitementRefCode()).append(" - ").append(pool.getRunningThreadCount()).append(" running thread(s)\n");
- for(RunnableBatch rb: pool.getThreads()) {
- sb.append("\t").append(rb.getName()).append(": ").append(rb.isAlive()?"running":"idle").append("\n");
- if(rb.isAlive()) sb.append(rb.getInformations());
- }
- } else {
- sb.append(typeTraitementRefCode()).append(" - not started\n");
- }
- return sb.toString();
- }
-
- @Override
- protected String getBatchVersion() {
- return "1.0";
- }
-
- private class PostImportCleanerImpl extends RunnableBatch {
- private final Logger logger = Logger.getLogger(PostImportCleanerImpl.class);
- private String importId;
- private String importDirectory;
- // utilities
- private ImportVOFull importFull;
-
- public PostImportCleanerImpl(String importId, String importDirectory) {
- this.importDirectory=importDirectory;
- this.importId=importId;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void run() {
- Connection con = null;
- try {
- con = PoolManager.getInstance().getConnection();
- loadImportInfos(con);
- File directory = new File(new File(importDirectory),importFull.getImportId());
- File sourceFile = new File(directory,importFull.getFileName());
- ArrayList<File> excludeFiles = new ArrayList<File>();
- if(importFull.getStatus()==5) {
- excludeFiles.add(sourceFile);
- }
- cleanDirectory(directory,excludeFiles);
- JdbcUtils.executeUpdate(con,"UPDATE XEM_IMPORTS SET CLEANED=1 WHERE IMPORT_ID=?",new PStmtBinder.SimplePStmtBinderBuilder().add(importId).toPStmtBinder());
- } catch(SQLException sqlEx) {
- logger.error("PostImportCleanerImpl.run()",sqlEx);
- } finally {
- if(con!=null) {
- PoolManager.getInstance().releaseConnection(con);
- }
- getNotifier().notifyThreadStop(this);
- }
- }
-
- private void cleanDirectory(File directory, List<File> excludes) {
- for(File f:directory.listFiles()) {
- if(excludes.contains(f)) continue;
- else if(f.isDirectory()) {
- cleanDirectory(f, excludes);
- } else {
- f.delete();
- }
- }
- if(directory.listFiles().length==0 && !excludes.contains(directory)) {
- directory.delete();
- }
- }
-
- @SuppressWarnings("unchecked")
- public void loadImportInfos(Connection con) throws SQLException {
- ImportVOPK pk = new ImportVOPK(importId);
- importFull = ImportVOWrapper.getImportVOFullByPk(con, pk);
-// files = XemImportsFilesVOWrapper.getAllXemImportsFilesVOFullBy(con, XemImportsFilesVOFull.class, "SELECT "+XemImportsFilesVOWrapper.getSelectFieldsClause()+" FROM XEM_IMPORTS_FILES WHERE IMPORT_ID='"+importId+"'");
- }
-
- @Override
- public String getInformations() {
- if(isAlive()) return "\t".concat(getName()).concat(": running\n");
- else return "\t".concat(getName()).concat(": idle\n");
- }
-
- }
-
+ public static final int MAX_CONCURRENT_THREADS = 3;
+ private int maxConcurrentThreads = MAX_CONCURRENT_THREADS;
+ private String importDirectory = null;
+
+ private ThreadPool<PostImportCleanerImpl> pool = null;
+
+ public PostImportCleaner(final String args[]) {
+ super(args);
+ }
+
+ @Override
+ protected void initialize() throws Exception {
+ super.initialize();
+ final String sTmp = this.getProps().getProperty("max.concurrent.threads");
+ if (sTmp != null && sTmp.length() > 0) {
+ try {
+ this.maxConcurrentThreads = Integer.parseInt(sTmp);
+ } catch (final Throwable t) {
+ }
+ }
+ }
+
+ @Override
+ protected void doProcess() throws Exception {
+ Connection con = null;
+ Statement st = null;
+ ResultSet rs = null;
+ final ArrayList<PostImportCleanerImpl> threads = new ArrayList<PostImportCleanerImpl>();
+ try {
+ con = PoolManager.getInstance().getConnection();
+ final String sqlParam = "SELECT PARAM_VALUE FROM PARAMETERS WHERE PARAM_NAME='import.directory'";
+ st = con.createStatement();
+ rs = st.executeQuery(sqlParam);
+ if (rs.next()) {
+ this.importDirectory = rs.getString(1);
+ rs.close();
+ } else {
+ throw new Exception("parameter 'import.directory' not set !");
+ }
+ final String sql = "SELECT IMPORT_ID FROM XEM_IMPORTS WHERE STATUS IN (4,5) AND CLEANED IS NULL";
+ rs = st.executeQuery(sql);
+ while (rs.next()) {
+ threads.add(new PostImportCleanerImpl(rs.getString(1), this.importDirectory));
+ }
+ PoolManager.getInstance().releaseConnection(con);
+ con = null;
+ this.pool = new ThreadPool<PostImportCleanerImpl>(threads);
+ this.pool.start(this.maxConcurrentThreads);
+ } catch (final SQLException sqlEx) {
+ // TODO
+ } finally {
+ if (con != null) {
+ PoolManager.getInstance().releaseConnection(con);
+ }
+ }
+ }
+
+ @Override
+ public String getResumeTraitement() {
+ return "post-import.cleaner.resume";
+ }
+
+ @Override
+ public String typeTraitementRefCode() {
+ return "post-import.cleaner";
+
+ }
+
+ @Override
+ public String getInformations() {
+ final StringBuilder sb = new StringBuilder();
+ if (this.isStarted()) {
+ sb.append(this.typeTraitementRefCode()).append(" - ").append(this.pool.getRunningThreadCount()).append(" running thread(s)\n");
+ for (final RunnableBatch rb : this.pool.getThreads()) {
+ sb.append("\t").append(rb.getName()).append(": ").append(rb.isAlive() ? "running" : "idle").append("\n");
+ if (rb.isAlive()) {
+ sb.append(rb.getInformations());
+ }
+ }
+ } else {
+ sb.append(this.typeTraitementRefCode()).append(" - not started\n");
+ }
+ return sb.toString();
+ }
+
+ @Override
+ protected String getBatchVersion() {
+ return "1.0";
+ }
+
+ private class PostImportCleanerImpl extends RunnableBatch {
+ private final Logger logger = Logger.getLogger(PostImportCleanerImpl.class);
+ private final String importId;
+ private final String importDirectory;
+ // utilities
+ private ImportVOFull importFull;
+
+ public PostImportCleanerImpl(final String importId, final String importDirectory) {
+ this.importDirectory = importDirectory;
+ this.importId = importId;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void run() {
+ Connection con = null;
+ try {
+ con = PoolManager.getInstance().getConnection();
+ this.loadImportInfos(con);
+ final File directory = new File(new File(this.importDirectory), this.importFull.getImportId());
+ final File sourceFile = new File(directory, this.importFull.getFileName());
+ final ArrayList<File> excludeFiles = new ArrayList<File>();
+ if (this.importFull.getStatus() == 5) {
+ excludeFiles.add(sourceFile);
+ }
+ this.cleanDirectory(directory, excludeFiles);
+ JdbcUtils.executeUpdate(con, "UPDATE XEM_IMPORTS SET CLEANED=1 WHERE IMPORT_ID=?", new PStmtBinder.SimplePStmtBinderBuilder().add(this.importId).toPStmtBinder());
+ } catch (final SQLException sqlEx) {
+ this.logger.error("PostImportCleanerImpl.run()", sqlEx);
+ } finally {
+ if (con != null) {
+ PoolManager.getInstance().releaseConnection(con);
+ }
+ this.getNotifier().notifyThreadStop(this);
+ }
+ }
+
+ private void cleanDirectory(final File directory, final List<File> excludes) {
+ for (final File f : directory.listFiles()) {
+ if (excludes.contains(f)) {
+ continue;
+ } else if (f.isDirectory()) {
+ this.cleanDirectory(f, excludes);
+ } else {
+ f.delete();
+ }
+ }
+ if (directory.listFiles().length == 0 && !excludes.contains(directory)) {
+ directory.delete();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void loadImportInfos(final Connection con) throws SQLException {
+ final ImportVOPK pk = new ImportVOPK(this.importId);
+ this.importFull = ImportVOWrapper.getImportVOFullByPk(con, pk);
+ // files = XemImportsFilesVOWrapper.getAllXemImportsFilesVOFullBy(con, XemImportsFilesVOFull.class,
+ // "SELECT "+XemImportsFilesVOWrapper.getSelectFieldsClause()+" FROM XEM_IMPORTS_FILES WHERE IMPORT_ID='"+importId+"'");
+ }
+
+ @Override
+ public String getInformations() {
+ if (this.isAlive()) {
+ return "\t".concat(this.getName()).concat(": running\n");
+ } else {
+ return "\t".concat(this.getName()).concat(": idle\n");
+ }
+ }
+
+ }
}