feat: File processing.

This commit is contained in:
Piotr Dec 2024-07-18 02:51:18 +02:00
parent f0b7096865
commit 2025cca693
Signed by: stawros
GPG key ID: F89F27AD8F881A91
9 changed files with 162 additions and 0 deletions

View file

@ -4,10 +4,12 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationPropertiesScan; import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.scheduling.annotation.EnableAsync;
@SpringBootApplication @SpringBootApplication
@EnableConfigurationProperties @EnableConfigurationProperties
@ConfigurationPropertiesScan @ConfigurationPropertiesScan
@EnableAsync
public class Main { public class Main {
public static void main(String[] args) { public static void main(String[] args) {

View file

@ -0,0 +1,66 @@
package eu.ztsh.lfr.core.impl.files;
import eu.ztsh.lfr.config.DataProperties;
import eu.ztsh.lfr.model.FileProcessingException;
import eu.ztsh.lfr.model.DataRow;
import eu.ztsh.lfr.model.events.FileModifiedEvent;
import eu.ztsh.lfr.model.Temperatures;
import eu.ztsh.lfr.model.events.FileProcessedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Stream;
@Service
public class FileLoadingService {
private static final Logger log = LoggerFactory.getLogger(FileLoadingService.class);
private final DataProperties dataProperties;
private final ApplicationEventPublisher eventPublisher;
@Autowired
public FileLoadingService(DataProperties dataProperties, ApplicationEventPublisher eventPublisher) {
this.dataProperties = dataProperties;
this.eventPublisher = eventPublisher;
}
@Async
@EventListener(FileModifiedEvent.class)
public void onFileModified() {
var start = System.currentTimeMillis();
log.debug("Processing file");
final Temperatures model = new Temperatures();
try (final Stream<String> lines = Files.lines(Paths.get(dataProperties.fileUrl()))) {
lines.parallel()
.filter(Predicate.not(String::isBlank))
.map(line -> {
try {
var parts = line.split(";");
return new DataRow(parts[0], Integer.parseInt(parts[1].split("-")[0]), Double.parseDouble(parts[2]));
} catch (Exception e) {
throw new FileProcessingException("Error in line %s".formatted(line), e);
}
})
.forEach(model::addData);
} catch (IOException e) {
throw new FileProcessingException("File error", e);
}
var millis = System.currentTimeMillis() - start;
log.atInfo().setMessage("Elapsed time: {}:{}").addArgument(TimeUnit.MILLISECONDS.toMinutes(millis))
.addArgument(() -> String.format("%02d", TimeUnit.MILLISECONDS.toSeconds(millis) % 60))
.log();
eventPublisher.publishEvent(new FileProcessedEvent(model.getAverages()));
}
}

View file

@ -0,0 +1,13 @@
package eu.ztsh.lfr.model;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class Averages extends HashMap<String, List<Average>> {
public Averages(Map<String, List<Average>> m) {
super(m);
}
}

View file

@ -0,0 +1,5 @@
package eu.ztsh.lfr.model;
public record DataRow(String city, int year, double temperature) {
}

View file

@ -0,0 +1,9 @@
package eu.ztsh.lfr.model;
public class FileProcessingException extends RuntimeException {
public FileProcessingException(String message, Throwable cause) {
super(message, cause);
}
}

View file

@ -0,0 +1,33 @@
package eu.ztsh.lfr.model;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
public class Temperatures extends ConcurrentHashMap<String, ConcurrentMap<Integer, YearDataAccumulator>> {
public void addData(DataRow dataRow) {
putIfAbsent(dataRow.city(), new ConcurrentHashMap<>());
var city = get(dataRow.city());
city.putIfAbsent(dataRow.year(), new YearDataAccumulator());
city.get(dataRow.year()).add(dataRow.temperature());
}
public Averages getAverages() {
var computed = entrySet().stream()
.map(entry -> {
var newEntry = entry.getValue().entrySet().stream()
.map(yearEntry -> {
var average = new Average();
average.setYear(yearEntry.getKey().toString());
average.setAverageTemperature(yearEntry.getValue().getAverage());
return average;
})
.toList();
return Map.entry(entry.getKey(), newEntry);
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return new Averages(computed);
}
}

View file

@ -0,0 +1,25 @@
package eu.ztsh.lfr.model;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.DoubleAdder;
public class YearDataAccumulator {
private final DoubleAdder sum = new DoubleAdder();
private final AtomicInteger counter = new AtomicInteger();
public void add(double value) {
synchronized (this) {
sum.add(value);
counter.incrementAndGet();
}
}
public BigDecimal getAverage() {
return BigDecimal.valueOf(sum.sum()).setScale(2, RoundingMode.HALF_UP)
.divide(BigDecimal.valueOf(counter.get()), RoundingMode.HALF_UP);
}
}

View file

@ -0,0 +1,7 @@
package eu.ztsh.lfr.model.events;
import eu.ztsh.lfr.model.Averages;
public record FileProcessedEvent(Averages averages) {
}

View file

@ -0,0 +1,2 @@
with open('../data/temperatures.csv', 'a', encoding='utf-8') as target:
target.write("\n")