diff --git a/.gitignore b/.gitignore
index a471319..f9b4200 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,3 +3,6 @@ target/
### IntelliJ IDEA ###
.idea/
*.iml
+
+# local files
+data/
diff --git a/pom.xml b/pom.xml
index 8abc045..9c00eb4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -12,9 +12,9 @@
eu.ztsh
- spring-boot-template
- Spring Boot Standalone Template
- 1.0.0-SNAPSHOT
+ large-file-reading-challenge
+ Large File Reading Challenge
+ 1.0.0
@@ -30,6 +30,7 @@
+ 1.2.1
@@ -66,6 +67,24 @@
org.springframework.boot
spring-boot-maven-plugin
+
+ org.jsonschema2pojo
+ jsonschema2pojo-maven-plugin
+ ${jsonschema2pojo.version}
+
+ ${basedir}/schema
+ eu.ztsh.lfr.model
+ true
+ false
+
+
+
+
+ generate
+
+
+
+
diff --git a/readme.md b/readme.md
index 3a00ce1..b2c770f 100644
--- a/readme.md
+++ b/readme.md
@@ -1,7 +1,8 @@
# Large file reading challenge
Welcome in the recruitment challenge.
-Write an application that, at the endpoint specified by you, returns the yearly average temperatures for a given city in the format array of objects with the following fields: year, averageTemperature.
+Write an application that, at the endpoint specified by you, returns the yearly average temperatures for a given city in
+the format array of objects with the following fields: year, averageTemperature.
## Assumptions
@@ -10,23 +11,41 @@ Write an application that, at the endpoint specified by you, returns the yearly
- The content of the source file may change during the application's running
## Example source file
+
[example_file.csv](example_file.csv)
+## Response
+
+### Schema
+
+[response_schema](schema/response.json)
+
+### Example
-## Example response
```json
[
- {
- "year": "2021",
- "averageTemperature": 12.1
- },
- {
- "year": "2022",
- "averageTemperature": 11.1
- },
- {
- "year": "2023",
- "averageTemperature": 14.1
- }
+ {
+ "year": "2021",
+ "averageTemperature": 12.1
+ },
+ {
+ "year": "2022",
+ "averageTemperature": 11.1
+ },
+ {
+ "year": "2023",
+ "averageTemperature": 14.1
+ }
]
```
+
+## Configuration
+
+| property | description | default |
+|---------------|--------------------------------|-----------------------|
+| data.file-url | path to file with temperatures | data/temperatures.csv |
+
+## Usage
+
+Temperature statistics endpoint: `/api/temperatures/{city}`
+Returns HTTP 200 when city is found, 404 otherwise.
diff --git a/schema/response.json b/schema/response.json
new file mode 100644
index 0000000..527a9e3
--- /dev/null
+++ b/schema/response.json
@@ -0,0 +1,25 @@
+{
+ "$id": "https://ztsh.eu/lfr/response.json",
+ "$schema": "http://json-schema.org/draft/2020-12/schema",
+ "type": "array",
+ "def": {
+ "average": {
+ "type": "object",
+ "properties": {
+ "year": {
+ "type": "string"
+ },
+ "averageTemperature": {
+ "type": "number"
+ }
+ },
+ "required": [
+ "year",
+ "averageTemperature"
+ ]
+ }
+ },
+ "items": {
+ "$ref": "#/def/average"
+ }
+}
diff --git a/src/main/java/eu/ztsh/lfr/Main.java b/src/main/java/eu/ztsh/lfr/Main.java
index 1f3f728..328214a 100644
--- a/src/main/java/eu/ztsh/lfr/Main.java
+++ b/src/main/java/eu/ztsh/lfr/Main.java
@@ -2,8 +2,16 @@ package eu.ztsh.lfr;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
+@EnableConfigurationProperties
+@ConfigurationPropertiesScan
+@EnableAsync
+@EnableScheduling
public class Main {
public static void main(String[] args) {
diff --git a/src/main/java/eu/ztsh/lfr/config/DataProperties.java b/src/main/java/eu/ztsh/lfr/config/DataProperties.java
new file mode 100644
index 0000000..152bc52
--- /dev/null
+++ b/src/main/java/eu/ztsh/lfr/config/DataProperties.java
@@ -0,0 +1,8 @@
+package eu.ztsh.lfr.config;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@ConfigurationProperties("data")
+public record DataProperties(String fileUrl) {
+
+}
diff --git a/src/main/java/eu/ztsh/lfr/core/TemperaturesService.java b/src/main/java/eu/ztsh/lfr/core/TemperaturesService.java
new file mode 100644
index 0000000..70c1c3d
--- /dev/null
+++ b/src/main/java/eu/ztsh/lfr/core/TemperaturesService.java
@@ -0,0 +1,13 @@
+package eu.ztsh.lfr.core;
+
+import eu.ztsh.lfr.model.Average;
+import jakarta.annotation.Nonnull;
+
+import java.util.List;
+
+public interface TemperaturesService {
+
+ @Nonnull
+ List getTemperaturesFor(String city);
+
+}
diff --git a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java
new file mode 100644
index 0000000..e3c608c
--- /dev/null
+++ b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java
@@ -0,0 +1,42 @@
+package eu.ztsh.lfr.core.impl;
+
+import eu.ztsh.lfr.core.TemperaturesService;
+import eu.ztsh.lfr.model.Average;
+import eu.ztsh.lfr.model.Averages;
+import eu.ztsh.lfr.model.events.FileProcessedEvent;
+import jakarta.annotation.Nonnull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+@Service
+public class TemperaturesServiceImpl implements TemperaturesService {
+
+ private static final Logger log = LoggerFactory.getLogger(TemperaturesServiceImpl.class);
+ private final AtomicReference averages = new AtomicReference<>();
+
+ @Nonnull
+ @Override
+ public synchronized List getTemperaturesFor(String city) {
+ while (averages.get() == null) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ log.error("Thread was interrupted", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ return averages.get().getOrDefault(city.toLowerCase(), List.of());
+ }
+
+ @EventListener(FileProcessedEvent.class)
+ public synchronized void updateTemperatures(FileProcessedEvent event) {
+ averages.set(event.averages());
+ notifyAll();
+ }
+
+}
diff --git a/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java b/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java
new file mode 100644
index 0000000..500d03f
--- /dev/null
+++ b/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java
@@ -0,0 +1,100 @@
+package eu.ztsh.lfr.core.impl.files;
+
+import eu.ztsh.lfr.config.DataProperties;
+import eu.ztsh.lfr.model.Averages;
+import eu.ztsh.lfr.model.DataRow;
+import eu.ztsh.lfr.model.FileProcessingException;
+import eu.ztsh.lfr.model.Temperatures;
+import eu.ztsh.lfr.model.events.FileModifiedEvent;
+import eu.ztsh.lfr.model.events.FileProcessedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.ApplicationEvent;
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.context.event.EventListener;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+@Service
+public class FileLoadingService {
+
+ private static final Logger log = LoggerFactory.getLogger(FileLoadingService.class);
+ private final DataProperties dataProperties;
+ private final ApplicationEventPublisher eventPublisher;
+
+ private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>();
+ private final AtomicBoolean working = new AtomicBoolean();
+
+ @Autowired
+ public FileLoadingService(DataProperties dataProperties, ApplicationEventPublisher eventPublisher) {
+ this.dataProperties = dataProperties;
+ this.eventPublisher = eventPublisher;
+ }
+
+ @Async
+ @EventListener({FileModifiedEvent.class, ApplicationReadyEvent.class})
+ public void onFileModified(ApplicationEvent event) {
+ queue.add(event);
+ }
+
+ @Scheduled(fixedRate=500)
+ void runTask() {
+ if (working.get()) {
+ return;
+ }
+ ApplicationEvent event;
+ log.debug("{} events queued", queue.size());
+ do {
+ // process only last
+ event = queue.poll();
+ } while (!queue.isEmpty());
+ if (event != null) {
+ log.info("Processing event: {}", event.getClass().getSimpleName());
+ working.set(true);
+ var result = processFile();
+ eventPublisher.publishEvent(new FileProcessedEvent(result));
+ working.set(false);
+ }
+ }
+
+ private Averages processFile() {
+ var start = System.currentTimeMillis();
+ log.debug("Processing file");
+ final Temperatures model = new Temperatures();
+ try (final Stream lines = Files.lines(Paths.get(dataProperties.fileUrl()))) {
+ lines.parallel()
+ .filter(Predicate.not(String::isBlank))
+ .map(line -> {
+ try {
+ var parts = line.split(";");
+ return new DataRow(parts[0].toLowerCase(),
+ Integer.parseInt(parts[1].split("-")[0]),
+ Double.parseDouble(parts[2]));
+ } catch (Exception e) {
+ throw new FileProcessingException("Error in line %s".formatted(line), e);
+ }
+ })
+ .forEach(model::addData);
+ } catch (IOException e) {
+ throw new FileProcessingException("File error", e);
+ }
+ var millis = System.currentTimeMillis() - start;
+ log.atInfo().setMessage("Elapsed time: {}:{}").addArgument(TimeUnit.MILLISECONDS.toMinutes(millis))
+ .addArgument(() -> String.format("%02d", TimeUnit.MILLISECONDS.toSeconds(millis) % 60))
+ .log();
+ return model.getAverages();
+ }
+
+}
diff --git a/src/main/java/eu/ztsh/lfr/core/impl/files/WatcherService.java b/src/main/java/eu/ztsh/lfr/core/impl/files/WatcherService.java
new file mode 100644
index 0000000..febcbae
--- /dev/null
+++ b/src/main/java/eu/ztsh/lfr/core/impl/files/WatcherService.java
@@ -0,0 +1,77 @@
+package eu.ztsh.lfr.core.impl.files;
+
+import eu.ztsh.lfr.config.DataProperties;
+import eu.ztsh.lfr.model.events.FileModifiedEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Service;
+
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Paths;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+
+@Service
+public class WatcherService {
+
+ private static final Logger log = LoggerFactory.getLogger(WatcherService.class);
+
+ private final WatchService watchService;
+ private final ApplicationEventPublisher eventPublisher;
+
+ private final String fileName;
+
+ @Autowired
+ public WatcherService(DataProperties dataProperties, ApplicationEventPublisher eventPublisher) throws IOException {
+ this.eventPublisher = eventPublisher;
+ this.watchService = FileSystems.getDefault().newWatchService();
+
+ var filePath = Paths.get(dataProperties.fileUrl());
+ fileName = filePath.getFileName().toString();
+ filePath.getParent().register(
+ watchService,
+ // register file creation as some editors modify files in temp, and then replaces original file.
+ StandardWatchEventKinds.ENTRY_CREATE,
+ StandardWatchEventKinds.ENTRY_MODIFY
+ );
+ }
+
+ @SuppressWarnings("BusyWait")
+ @EventListener(ApplicationReadyEvent.class)
+ public void watch() throws InterruptedException {
+ WatchKey key;
+ while ((key = watchService.take()) != null) {
+ /*
+ When modifying in IntelliJ, one line edit ends with:
+ ENTRY_CREATE: temperatures.csv~
+ ENTRY_MODIFY: temperatures.csv~
+ ENTRY_MODIFY: temperatures.csv~
+ ENTRY_MODIFY: temperatures.csv
+ ENTRY_MODIFY: temperatures.csv
+ ENTRY_MODIFY: temperatures.csv
+ Thread.sleep is used to filter out duplicated events.
+ origin: https://stackoverflow.com/a/25221600
+ */
+ Thread.sleep(50);
+ var maybeLastEvent = key.pollEvents().stream()
+ .filter(event -> event.context() != null)
+ .filter(event -> event.context().toString().endsWith(fileName))
+ .reduce((o1, o2) -> o2);
+ if (maybeLastEvent.isPresent()) {
+ var lastEvent = maybeLastEvent.get();
+ log.info("Got event of kind:{}", lastEvent.kind());
+ eventPublisher.publishEvent(new FileModifiedEvent(this));
+ } else {
+ log.trace("Got event for not watched file");
+ }
+ key.reset();
+ }
+ }
+
+}
diff --git a/src/main/java/eu/ztsh/lfr/model/Averages.java b/src/main/java/eu/ztsh/lfr/model/Averages.java
new file mode 100644
index 0000000..118fe61
--- /dev/null
+++ b/src/main/java/eu/ztsh/lfr/model/Averages.java
@@ -0,0 +1,13 @@
+package eu.ztsh.lfr.model;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Averages extends HashMap> {
+
+ public Averages(Map> m) {
+ super(m);
+ }
+
+}
diff --git a/src/main/java/eu/ztsh/lfr/model/DataRow.java b/src/main/java/eu/ztsh/lfr/model/DataRow.java
new file mode 100644
index 0000000..43ac596
--- /dev/null
+++ b/src/main/java/eu/ztsh/lfr/model/DataRow.java
@@ -0,0 +1,5 @@
+package eu.ztsh.lfr.model;
+
+public record DataRow(String city, int year, double temperature) {
+
+}
diff --git a/src/main/java/eu/ztsh/lfr/model/FileProcessingException.java b/src/main/java/eu/ztsh/lfr/model/FileProcessingException.java
new file mode 100644
index 0000000..eb54b52
--- /dev/null
+++ b/src/main/java/eu/ztsh/lfr/model/FileProcessingException.java
@@ -0,0 +1,9 @@
+package eu.ztsh.lfr.model;
+
+public class FileProcessingException extends RuntimeException {
+
+ public FileProcessingException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
diff --git a/src/main/java/eu/ztsh/lfr/model/Temperatures.java b/src/main/java/eu/ztsh/lfr/model/Temperatures.java
new file mode 100644
index 0000000..6916811
--- /dev/null
+++ b/src/main/java/eu/ztsh/lfr/model/Temperatures.java
@@ -0,0 +1,33 @@
+package eu.ztsh.lfr.model;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+
+public class Temperatures extends ConcurrentHashMap> {
+
+ public void addData(DataRow dataRow) {
+ putIfAbsent(dataRow.city(), new ConcurrentHashMap<>());
+ var city = get(dataRow.city());
+ city.putIfAbsent(dataRow.year(), new YearDataAccumulator());
+ city.get(dataRow.year()).add(dataRow.temperature());
+ }
+
+ public Averages getAverages() {
+ var computed = entrySet().stream()
+ .map(entry -> {
+ var newEntry = entry.getValue().entrySet().stream()
+ .map(yearEntry -> {
+ var average = new Average();
+ average.setYear(yearEntry.getKey().toString());
+ average.setAverageTemperature(yearEntry.getValue().getAverage());
+ return average;
+ })
+ .toList();
+ return Map.entry(entry.getKey(), newEntry);
+ }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ return new Averages(computed);
+ }
+
+}
diff --git a/src/main/java/eu/ztsh/lfr/model/YearDataAccumulator.java b/src/main/java/eu/ztsh/lfr/model/YearDataAccumulator.java
new file mode 100644
index 0000000..2b58474
--- /dev/null
+++ b/src/main/java/eu/ztsh/lfr/model/YearDataAccumulator.java
@@ -0,0 +1,25 @@
+package eu.ztsh.lfr.model;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.DoubleAdder;
+
+public class YearDataAccumulator {
+
+ private final DoubleAdder sum = new DoubleAdder();
+ private final AtomicInteger counter = new AtomicInteger();
+
+ public void add(double value) {
+ synchronized (this) {
+ sum.add(value);
+ counter.incrementAndGet();
+ }
+ }
+
+ public BigDecimal getAverage() {
+ return BigDecimal.valueOf(sum.sum()).setScale(2, RoundingMode.HALF_UP)
+ .divide(BigDecimal.valueOf(counter.get()), RoundingMode.HALF_UP);
+ }
+
+}
diff --git a/src/main/java/eu/ztsh/lfr/model/events/FileModifiedEvent.java b/src/main/java/eu/ztsh/lfr/model/events/FileModifiedEvent.java
new file mode 100644
index 0000000..19860b9
--- /dev/null
+++ b/src/main/java/eu/ztsh/lfr/model/events/FileModifiedEvent.java
@@ -0,0 +1,11 @@
+package eu.ztsh.lfr.model.events;
+
+import org.springframework.context.ApplicationEvent;
+
+public class FileModifiedEvent extends ApplicationEvent {
+
+ public FileModifiedEvent(Object source) {
+ super(source);
+ }
+
+}
diff --git a/src/main/java/eu/ztsh/lfr/model/events/FileProcessedEvent.java b/src/main/java/eu/ztsh/lfr/model/events/FileProcessedEvent.java
new file mode 100644
index 0000000..9e5ab8f
--- /dev/null
+++ b/src/main/java/eu/ztsh/lfr/model/events/FileProcessedEvent.java
@@ -0,0 +1,7 @@
+package eu.ztsh.lfr.model.events;
+
+import eu.ztsh.lfr.model.Averages;
+
+public record FileProcessedEvent(Averages averages) {
+
+}
diff --git a/src/main/java/eu/ztsh/lfr/web/TemperaturesController.java b/src/main/java/eu/ztsh/lfr/web/TemperaturesController.java
new file mode 100644
index 0000000..39b583d
--- /dev/null
+++ b/src/main/java/eu/ztsh/lfr/web/TemperaturesController.java
@@ -0,0 +1,32 @@
+package eu.ztsh.lfr.web;
+
+import eu.ztsh.lfr.core.TemperaturesService;
+import eu.ztsh.lfr.model.Average;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.List;
+
+@RestController
+@RequestMapping("/api/temperatures")
+public class TemperaturesController {
+
+ private final TemperaturesService temperaturesService;
+
+ @Autowired
+ public TemperaturesController(TemperaturesService temperaturesService) {
+ this.temperaturesService = temperaturesService;
+ }
+
+ @GetMapping("/{city}")
+ public ResponseEntity> getTemperatures(@PathVariable String city) {
+ var data = temperaturesService.getTemperaturesFor(city);
+
+ return data.isEmpty() ? ResponseEntity.notFound().build() : ResponseEntity.ok(data);
+ }
+
+}
diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml
index e69de29..8008f0f 100644
--- a/src/main/resources/application.yaml
+++ b/src/main/resources/application.yaml
@@ -0,0 +1,2 @@
+data:
+ file-url: data/temperatures.csv
diff --git a/src/test/java/.gitkeep b/src/test/java/.gitkeep
deleted file mode 100644
index e69de29..0000000
diff --git a/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java b/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java
new file mode 100644
index 0000000..86ca75c
--- /dev/null
+++ b/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java
@@ -0,0 +1,74 @@
+package eu.ztsh.lfr.core.impl.files;
+
+import eu.ztsh.lfr.config.DataProperties;
+import eu.ztsh.lfr.model.FileProcessingException;
+import eu.ztsh.lfr.model.events.FileProcessedEvent;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.springframework.context.ApplicationEvent;
+import org.springframework.context.ApplicationEventPublisher;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class FileLoadingServiceTest {
+
+ private final ApplicationEventPublisher eventPublisher = Mockito.mock(ApplicationEventPublisher.class);
+
+ @Test
+ void successfulRunTest() {
+ var captor = ArgumentCaptor.forClass(FileProcessedEvent.class);
+ var service = createFileLoadingService("success.csv");
+ service.onFileModified(new TestEvent());
+ assertThatNoException().isThrownBy(service::runTask);
+ Mockito.verify(eventPublisher, Mockito.times(1))
+ .publishEvent(captor.capture());
+ var averages = captor.getValue().averages();
+ assertThat(averages).hasSize(1);
+ var list = averages.get("warszawa");
+ assertThat(list).hasSize(1);
+ assertThat(list.getFirst()).satisfies(item -> {
+ assertThat(item.getYear()).isEqualTo("2018");
+ // 29.2775, but it is BigDecimal with scale 2 and round half up
+ assertThat(item.getAverageTemperature().doubleValue()).isEqualTo(29.28);
+ });
+ }
+
+ @Test
+ void notExistingFileTest() {
+ var service = createFileLoadingService("not-existing.csv");
+ service.onFileModified(new TestEvent());
+ assertThatThrownBy(service::runTask)
+ .isInstanceOf(FileProcessingException.class)
+ .hasRootCauseInstanceOf(IOException.class);
+ Mockito.verifyNoInteractions(eventPublisher);
+ }
+
+ @Test
+ void malformedFileTest() {
+ var service = createFileLoadingService("malformed.csv");
+ service.onFileModified(new TestEvent());
+ assertThatThrownBy(service::runTask)
+ .isInstanceOf(FileProcessingException.class)
+ .hasMessage("Error in line Warszawa;2018-09-21 11:17:42.31833.27")
+ .hasRootCauseInstanceOf(ArrayIndexOutOfBoundsException.class);
+ Mockito.verifyNoInteractions(eventPublisher);
+ }
+
+ private FileLoadingService createFileLoadingService(String path) {
+ return new FileLoadingService(new DataProperties("src/test/resources/" + path), eventPublisher);
+ }
+
+ private static class TestEvent extends ApplicationEvent {
+
+ public TestEvent() {
+ super("TestEvent");
+ }
+
+ }
+
+}
diff --git a/src/test/resources/malformed.csv b/src/test/resources/malformed.csv
new file mode 100644
index 0000000..b634989
--- /dev/null
+++ b/src/test/resources/malformed.csv
@@ -0,0 +1,4 @@
+Warszawa;2018-09-19 05:17:32.619;9.97
+Warszawa;2018-09-20 18:44:42.468;39.02
+Warszawa;2018-09-21 11:17:42.31833.27
+Warszawa;2018-09-22 19:25:07.568;34.85
diff --git a/src/test/resources/success.csv b/src/test/resources/success.csv
new file mode 100644
index 0000000..7a4d8c5
--- /dev/null
+++ b/src/test/resources/success.csv
@@ -0,0 +1,4 @@
+Warszawa;2018-09-19 05:17:32.619;9.97
+Warszawa;2018-09-20 18:44:42.468;39.02
+Warszawa;2018-09-21 11:17:42.318;33.27
+Warszawa;2018-09-22 19:25:07.568;34.85
diff --git a/util/change-event-creator.py b/util/change-event-creator.py
new file mode 100644
index 0000000..5512773
--- /dev/null
+++ b/util/change-event-creator.py
@@ -0,0 +1,2 @@
+with open('../data/temperatures.csv', 'a', encoding='utf-8') as target:
+ target.write("\n")
diff --git a/util/test-generator.py b/util/test-generator.py
new file mode 100644
index 0000000..517245c
--- /dev/null
+++ b/util/test-generator.py
@@ -0,0 +1,100 @@
+"""
+Create data in city;yyyy-mm-dd HH:mm:ss.SSS;temp format
+
+Avg line length: ~36 bytes
+Expected output file size: >3 GB -> 3 000 000 000 bytes
+
+Needed lines count = ~83.(3)m
+
+66 district cities * 75 years with measurement twice per hour = 66*75*365*24*2 = 86 724 000 -> ~3.1 GB
+"""
+
+import datetime
+import numpy as np
+
+cities = [
+ "Biała Podlaska",
+ "Białystok",
+ "Bielsko-Biała",
+ "Bydgoszcz",
+ "Bytom",
+ "Chełm",
+ "Chorzów",
+ "Częstochowa",
+ "Dąbrowa Górnicza",
+ "Elbląg",
+ "Gdańsk",
+ "Gdynia",
+ "Gliwice",
+ "Gorzów Wielkopolski",
+ "Grudziądz",
+ "Jastrzębie-Zdrój",
+ "Jaworzno",
+ "Jelenia Góra",
+ "Kalisz",
+ "Katowice",
+ "Kielce",
+ "Konin",
+ "Koszalin",
+ "Kraków",
+ "Krosno",
+ "Legnica",
+ "Leszno",
+ "Lublin",
+ "Łomża",
+ "Łódź",
+ "Mysłowice",
+ "Nowy Sącz",
+ "Olsztyn",
+ "Opole",
+ "Ostrołęka",
+ "Piekary Śląskie",
+ "Piotrków Trybunalski",
+ "Płock",
+ "Poznań",
+ "Przemyśl",
+ "Radom",
+ "Ruda Śląska",
+ "Rybnik",
+ "Rzeszów",
+ "Siedlce",
+ "Siemianowice Śląskie",
+ "Skierniewice",
+ "Słupsk",
+ "Sopot",
+ "Sosnowiec",
+ "Suwałki",
+ "Szczecin",
+ "Świętochłowice",
+ "Świnoujście",
+ "Tarnobrzeg",
+ "Tarnów",
+ "Toruń",
+ "Tychy",
+ "Wałbrzych",
+ "Włocławek",
+ "Wrocław",
+ "Zabrze",
+ "Zamość",
+ "Zielona Góra",
+ "Żory"
+]
+
+begin_date = datetime.datetime(year=1949, month=1, day=1, hour=0, minute=0, second=0)
+end_date = begin_date + datetime.timedelta(days=365 * 75)
+
+
+generator = np.random.default_rng(790492283396)
+batch = iter(generator.integers(low=-1500, high=3500, size=66*75*365*24*2))
+
+start = datetime.datetime.now()
+with open('../data/temperatures.csv', 'w', encoding='utf-8') as target:
+ for city in cities:
+ print(city)
+ now = begin_date
+ while now < end_date:
+ target.write("{};{}.000;{}\n".format(city, now, int(next(batch)) / 100.0))
+ now += datetime.timedelta(minutes=30)
+
+end = datetime.datetime.now()
+print("Completed in {}".format(end - start))