package com.megatim.fdxconsultation.core.impl.dataproductionworker;
|
|
import com.megatim.fdxconsultation.model.dataproduction.DataProduction;
|
import java.util.HashMap;
|
import java.util.HashSet;
|
import java.util.Map;
|
import java.util.Set;
|
import java.util.concurrent.atomic.AtomicInteger;
|
import javax.enterprise.context.ApplicationScoped;
|
import javax.inject.Inject;
|
|
/**
|
*
|
* @author Gabuntu
|
*/
|
@ApplicationScoped
|
public class DefaultDataProductionWorkerGroup implements DataProductionWorkerGroup {
|
|
@Inject
|
private DataProductionTaskFactory dataProductionTaskFactory;
|
|
private final int WORKER_NUMBER = 4;
|
private final DataProductionWorker[] workers = new DataProductionWorker[WORKER_NUMBER];
|
private final Map<Integer, Set<String>> workerToCodeTypeFichiers = new HashMap<>();
|
private final AtomicInteger currentWorkerIndex = new AtomicInteger(0);
|
|
@Override
|
public void start() {
|
for (int i = 0; i < WORKER_NUMBER; i++) {
|
workers[i] = new DataProductionWorker("Worker-" + i);
|
workers[i].start();
|
workerToCodeTypeFichiers.put(i, new HashSet<>());
|
}
|
}
|
|
@Override
|
public void shutdown() {
|
for (DataProductionWorker worker : workers) {
|
worker.shutdown();
|
}
|
workerToCodeTypeFichiers.clear();
|
}
|
|
@Override
|
public void addNewDataProductionTask(DataProduction dataProduction) {
|
DataProductionWorker worker = workerForTask(dataProduction.getCodeTypeFichier());
|
worker.addTask(dataProductionTaskFactory.createDataProductionTask(dataProduction));
|
}
|
|
private synchronized DataProductionWorker workerForTask(String codeTypeFichier) {
|
Integer workerIndex = workerLinkToCodeTypeFichier(codeTypeFichier);
|
if (workerIndex != null) {
|
return workers[workerIndex];
|
}
|
int currentIndex = currentWorkerIndex.getAndIncrement();
|
DataProductionWorker worker = workers[currentIndex];
|
|
workerToCodeTypeFichiers.get(currentIndex).add(codeTypeFichier);
|
|
changeCurrentWorkerIndex();
|
|
return worker;
|
}
|
|
private Integer workerLinkToCodeTypeFichier(String codeTypeFichier) {
|
return workerToCodeTypeFichiers
|
.entrySet()
|
.stream()
|
.filter(e -> e.getValue().contains(codeTypeFichier))
|
.findFirst()
|
.map(e -> e.getKey())
|
.orElse(null);
|
}
|
|
private void changeCurrentWorkerIndex() {
|
if ((currentWorkerIndex.get() % (WORKER_NUMBER - 1)) == 0) {
|
currentWorkerIndex.set(0);
|
}
|
}
|
|
}
|