diff --git a/src/main/java/eu/ztsh/lfr/Main.java b/src/main/java/eu/ztsh/lfr/Main.java index 569eded..ca05716 100644 --- a/src/main/java/eu/ztsh/lfr/Main.java +++ b/src/main/java/eu/ztsh/lfr/Main.java @@ -4,10 +4,12 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.ConfigurationPropertiesScan; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.scheduling.annotation.EnableAsync; @SpringBootApplication @EnableConfigurationProperties @ConfigurationPropertiesScan +@EnableAsync public class Main { public static void main(String[] args) { diff --git a/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java b/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java new file mode 100644 index 0000000..844bdff --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java @@ -0,0 +1,66 @@ +package eu.ztsh.lfr.core.impl.files; + +import eu.ztsh.lfr.config.DataProperties; +import eu.ztsh.lfr.model.FileProcessingException; +import eu.ztsh.lfr.model.DataRow; +import eu.ztsh.lfr.model.events.FileModifiedEvent; +import eu.ztsh.lfr.model.Temperatures; +import eu.ztsh.lfr.model.events.FileProcessedEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Stream; + +@Service +public class FileLoadingService { + + private static final Logger log = LoggerFactory.getLogger(FileLoadingService.class); + private final DataProperties dataProperties; + private final ApplicationEventPublisher eventPublisher; + + + @Autowired + public FileLoadingService(DataProperties dataProperties, ApplicationEventPublisher eventPublisher) { + this.dataProperties = dataProperties; + this.eventPublisher = eventPublisher; + } + + @Async + @EventListener(FileModifiedEvent.class) + public void onFileModified() { + var start = System.currentTimeMillis(); + log.debug("Processing file"); + final Temperatures model = new Temperatures(); + try (final Stream lines = Files.lines(Paths.get(dataProperties.fileUrl()))) { + lines.parallel() + .filter(Predicate.not(String::isBlank)) + .map(line -> { + try { + var parts = line.split(";"); + return new DataRow(parts[0], Integer.parseInt(parts[1].split("-")[0]), Double.parseDouble(parts[2])); + } catch (Exception e) { + throw new FileProcessingException("Error in line %s".formatted(line), e); + } + }) + .forEach(model::addData); + } catch (IOException e) { + throw new FileProcessingException("File error", e); + } + var millis = System.currentTimeMillis() - start; + log.atInfo().setMessage("Elapsed time: {}:{}").addArgument(TimeUnit.MILLISECONDS.toMinutes(millis)) + .addArgument(() -> String.format("%02d", TimeUnit.MILLISECONDS.toSeconds(millis) % 60)) + .log(); + eventPublisher.publishEvent(new FileProcessedEvent(model.getAverages())); + } + +} diff --git a/src/main/java/eu/ztsh/lfr/model/Averages.java b/src/main/java/eu/ztsh/lfr/model/Averages.java new file mode 100644 index 0000000..118fe61 --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/model/Averages.java @@ -0,0 +1,13 @@ +package eu.ztsh.lfr.model; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class Averages extends HashMap> { + + public Averages(Map> m) { + super(m); + } + +} diff --git a/src/main/java/eu/ztsh/lfr/model/DataRow.java b/src/main/java/eu/ztsh/lfr/model/DataRow.java new file mode 100644 index 0000000..43ac596 --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/model/DataRow.java @@ -0,0 +1,5 @@ +package eu.ztsh.lfr.model; + +public record DataRow(String city, int year, double temperature) { + +} diff --git a/src/main/java/eu/ztsh/lfr/model/FileProcessingException.java b/src/main/java/eu/ztsh/lfr/model/FileProcessingException.java new file mode 100644 index 0000000..eb54b52 --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/model/FileProcessingException.java @@ -0,0 +1,9 @@ +package eu.ztsh.lfr.model; + +public class FileProcessingException extends RuntimeException { + + public FileProcessingException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/src/main/java/eu/ztsh/lfr/model/Temperatures.java b/src/main/java/eu/ztsh/lfr/model/Temperatures.java new file mode 100644 index 0000000..6916811 --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/model/Temperatures.java @@ -0,0 +1,33 @@ +package eu.ztsh.lfr.model; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; + +public class Temperatures extends ConcurrentHashMap> { + + public void addData(DataRow dataRow) { + putIfAbsent(dataRow.city(), new ConcurrentHashMap<>()); + var city = get(dataRow.city()); + city.putIfAbsent(dataRow.year(), new YearDataAccumulator()); + city.get(dataRow.year()).add(dataRow.temperature()); + } + + public Averages getAverages() { + var computed = entrySet().stream() + .map(entry -> { + var newEntry = entry.getValue().entrySet().stream() + .map(yearEntry -> { + var average = new Average(); + average.setYear(yearEntry.getKey().toString()); + average.setAverageTemperature(yearEntry.getValue().getAverage()); + return average; + }) + .toList(); + return Map.entry(entry.getKey(), newEntry); + }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + return new Averages(computed); + } + +} diff --git a/src/main/java/eu/ztsh/lfr/model/YearDataAccumulator.java b/src/main/java/eu/ztsh/lfr/model/YearDataAccumulator.java new file mode 100644 index 0000000..2b58474 --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/model/YearDataAccumulator.java @@ -0,0 +1,25 @@ +package eu.ztsh.lfr.model; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.DoubleAdder; + +public class YearDataAccumulator { + + private final DoubleAdder sum = new DoubleAdder(); + private final AtomicInteger counter = new AtomicInteger(); + + public void add(double value) { + synchronized (this) { + sum.add(value); + counter.incrementAndGet(); + } + } + + public BigDecimal getAverage() { + return BigDecimal.valueOf(sum.sum()).setScale(2, RoundingMode.HALF_UP) + .divide(BigDecimal.valueOf(counter.get()), RoundingMode.HALF_UP); + } + +} diff --git a/src/main/java/eu/ztsh/lfr/model/events/FileProcessedEvent.java b/src/main/java/eu/ztsh/lfr/model/events/FileProcessedEvent.java new file mode 100644 index 0000000..9e5ab8f --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/model/events/FileProcessedEvent.java @@ -0,0 +1,7 @@ +package eu.ztsh.lfr.model.events; + +import eu.ztsh.lfr.model.Averages; + +public record FileProcessedEvent(Averages averages) { + +} diff --git a/util/change-event-creator.py b/util/change-event-creator.py new file mode 100644 index 0000000..5512773 --- /dev/null +++ b/util/change-event-creator.py @@ -0,0 +1,2 @@ +with open('../data/temperatures.csv', 'a', encoding='utf-8') as target: + target.write("\n")