From 3edf1cfb2724b47252f539e8d5d768cea6a93b06 Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Mon, 15 Jul 2024 23:24:40 +0200 Subject: [PATCH 01/19] fix: Project coordinates set --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 8abc045..451cc5b 100644 --- a/pom.xml +++ b/pom.xml @@ -12,8 +12,8 @@ eu.ztsh - spring-boot-template - Spring Boot Standalone Template + large-file-reading-challenge + Large File Reading Challenge 1.0.0-SNAPSHOT From 2a3bfb3a5b6a65e8ba0061c355351ea4f70f1f65 Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Mon, 15 Jul 2024 23:33:57 +0200 Subject: [PATCH 02/19] feat: Schema --- pom.xml | 19 +++++++++++++++++++ readme.md | 8 +++++++- schema/response.json | 25 +++++++++++++++++++++++++ 3 files changed, 51 insertions(+), 1 deletion(-) create mode 100644 schema/response.json diff --git a/pom.xml b/pom.xml index 451cc5b..40be7ee 100644 --- a/pom.xml +++ b/pom.xml @@ -30,6 +30,7 @@ + 1.2.1 @@ -66,6 +67,24 @@ org.springframework.boot spring-boot-maven-plugin + + org.jsonschema2pojo + jsonschema2pojo-maven-plugin + ${jsonschema2pojo.version} + + ${basedir}/schema + eu.ztsh.lfr.model + true + false + + + + + generate + + + + diff --git a/readme.md b/readme.md index 3a00ce1..0d417ee 100644 --- a/readme.md +++ b/readme.md @@ -13,7 +13,13 @@ Write an application that, at the endpoint specified by you, returns the yearly [example_file.csv](example_file.csv) -## Example response +## Response + +### Schema +[response_schema](schema/response.json) + +### Example + ```json [ { diff --git a/schema/response.json b/schema/response.json new file mode 100644 index 0000000..335cfe1 --- /dev/null +++ b/schema/response.json @@ -0,0 +1,25 @@ +{ + "$id": "https://ztsh.eu/lfr/response.json", + "$schema": "http://json-schema.org/draft/2020-12/schema", + "type": "array", + "def": { + "entry": { + "type": "object", + "properties": { + "year": { + "type": "string" + }, + "averageTemperature": { + "type": "number" + } + }, + "required": [ + "year", + "averageTemperature" + ] + } + }, + "items": { + "$ref": "#/def/entry" + } +} From b34f5b3301db63be8bd5bdb82dab1ba5bd617c3b Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Tue, 16 Jul 2024 21:57:52 +0200 Subject: [PATCH 03/19] feat: TemperaturesController --- readme.md | 5 +++++ .../ztsh/lfr/web/TemperaturesController.java | 20 +++++++++++++++++++ 2 files changed, 25 insertions(+) create mode 100644 src/main/java/eu/ztsh/lfr/web/TemperaturesController.java diff --git a/readme.md b/readme.md index 0d417ee..14ca349 100644 --- a/readme.md +++ b/readme.md @@ -36,3 +36,8 @@ Write an application that, at the endpoint specified by you, returns the yearly } ] ``` + +## Usage + +Temperature statistics endpoint: `/api/temperatures/{city}` +Returns HTTP 200 when city is found, 404 otherwise. diff --git a/src/main/java/eu/ztsh/lfr/web/TemperaturesController.java b/src/main/java/eu/ztsh/lfr/web/TemperaturesController.java new file mode 100644 index 0000000..fa5cd6b --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/web/TemperaturesController.java @@ -0,0 +1,20 @@ +package eu.ztsh.lfr.web; + +import eu.ztsh.lfr.model.Entry; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; + +@RestController +@RequestMapping("/api/temperatures") +public class TemperaturesController { + + @GetMapping("/{city}") + public List getTemperatures(@PathVariable String city) { + throw new UnsupportedOperationException("Not supported yet."); + } + +} From 2f161d34814da4d2e08c56bd3435b03ac104972a Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Tue, 16 Jul 2024 22:22:42 +0200 Subject: [PATCH 04/19] feat: TemperaturesService --- .../eu/ztsh/lfr/core/TemperaturesService.java | 13 +++++++++++++ .../core/impl/TemperaturesServiceImpl.java | 19 +++++++++++++++++++ .../ztsh/lfr/web/TemperaturesController.java | 16 ++++++++++++++-- 3 files changed, 46 insertions(+), 2 deletions(-) create mode 100644 src/main/java/eu/ztsh/lfr/core/TemperaturesService.java create mode 100644 src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java diff --git a/src/main/java/eu/ztsh/lfr/core/TemperaturesService.java b/src/main/java/eu/ztsh/lfr/core/TemperaturesService.java new file mode 100644 index 0000000..1ee66e3 --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/core/TemperaturesService.java @@ -0,0 +1,13 @@ +package eu.ztsh.lfr.core; + +import eu.ztsh.lfr.model.Entry; +import jakarta.annotation.Nonnull; + +import java.util.List; + +public interface TemperaturesService { + + @Nonnull + List getTemperaturesFor(String city); + +} diff --git a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java new file mode 100644 index 0000000..79b2f27 --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java @@ -0,0 +1,19 @@ +package eu.ztsh.lfr.core.impl; + +import eu.ztsh.lfr.core.TemperaturesService; +import eu.ztsh.lfr.model.Entry; +import jakarta.annotation.Nonnull; +import org.springframework.stereotype.Service; + +import java.util.List; + +@Service +public class TemperaturesServiceImpl implements TemperaturesService { + + @Nonnull + @Override + public List getTemperaturesFor(String city) { + throw new UnsupportedOperationException("Not supported yet."); + } + +} diff --git a/src/main/java/eu/ztsh/lfr/web/TemperaturesController.java b/src/main/java/eu/ztsh/lfr/web/TemperaturesController.java index fa5cd6b..52a2685 100644 --- a/src/main/java/eu/ztsh/lfr/web/TemperaturesController.java +++ b/src/main/java/eu/ztsh/lfr/web/TemperaturesController.java @@ -1,6 +1,9 @@ package eu.ztsh.lfr.web; +import eu.ztsh.lfr.core.TemperaturesService; import eu.ztsh.lfr.model.Entry; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; @@ -12,9 +15,18 @@ import java.util.List; @RequestMapping("/api/temperatures") public class TemperaturesController { + private final TemperaturesService temperaturesService; + + @Autowired + public TemperaturesController(TemperaturesService temperaturesService) { + this.temperaturesService = temperaturesService; + } + @GetMapping("/{city}") - public List getTemperatures(@PathVariable String city) { - throw new UnsupportedOperationException("Not supported yet."); + public ResponseEntity> getTemperatures(@PathVariable String city) { + var data = temperaturesService.getTemperaturesFor(city); + + return data.isEmpty() ? ResponseEntity.notFound().build() : ResponseEntity.ok(data); } } From 8d03c86c8f2d91994ba890c64f01d4192654d282 Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Tue, 16 Jul 2024 22:34:22 +0200 Subject: [PATCH 05/19] feat: DataProperties --- .gitignore | 3 ++ readme.md | 36 +++++++++++-------- src/main/java/eu/ztsh/lfr/Main.java | 4 +++ .../eu/ztsh/lfr/config/DataProperties.java | 8 +++++ src/main/resources/application.yaml | 2 ++ 5 files changed, 39 insertions(+), 14 deletions(-) create mode 100644 src/main/java/eu/ztsh/lfr/config/DataProperties.java diff --git a/.gitignore b/.gitignore index a471319..f9b4200 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,6 @@ target/ ### IntelliJ IDEA ### .idea/ *.iml + +# local files +data/ diff --git a/readme.md b/readme.md index 14ca349..b2c770f 100644 --- a/readme.md +++ b/readme.md @@ -1,7 +1,8 @@ # Large file reading challenge Welcome in the recruitment challenge. -Write an application that, at the endpoint specified by you, returns the yearly average temperatures for a given city in the format array of objects with the following fields: year, averageTemperature. +Write an application that, at the endpoint specified by you, returns the yearly average temperatures for a given city in +the format array of objects with the following fields: year, averageTemperature. ## Assumptions @@ -10,33 +11,40 @@ Write an application that, at the endpoint specified by you, returns the yearly - The content of the source file may change during the application's running ## Example source file -[example_file.csv](example_file.csv) +[example_file.csv](example_file.csv) ## Response ### Schema + [response_schema](schema/response.json) ### Example ```json [ - { - "year": "2021", - "averageTemperature": 12.1 - }, - { - "year": "2022", - "averageTemperature": 11.1 - }, - { - "year": "2023", - "averageTemperature": 14.1 - } + { + "year": "2021", + "averageTemperature": 12.1 + }, + { + "year": "2022", + "averageTemperature": 11.1 + }, + { + "year": "2023", + "averageTemperature": 14.1 + } ] ``` +## Configuration + +| property | description | default | +|---------------|--------------------------------|-----------------------| +| data.file-url | path to file with temperatures | data/temperatures.csv | + ## Usage Temperature statistics endpoint: `/api/temperatures/{city}` diff --git a/src/main/java/eu/ztsh/lfr/Main.java b/src/main/java/eu/ztsh/lfr/Main.java index 1f3f728..569eded 100644 --- a/src/main/java/eu/ztsh/lfr/Main.java +++ b/src/main/java/eu/ztsh/lfr/Main.java @@ -2,8 +2,12 @@ package eu.ztsh.lfr; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.ConfigurationPropertiesScan; +import org.springframework.boot.context.properties.EnableConfigurationProperties; @SpringBootApplication +@EnableConfigurationProperties +@ConfigurationPropertiesScan public class Main { public static void main(String[] args) { diff --git a/src/main/java/eu/ztsh/lfr/config/DataProperties.java b/src/main/java/eu/ztsh/lfr/config/DataProperties.java new file mode 100644 index 0000000..152bc52 --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/config/DataProperties.java @@ -0,0 +1,8 @@ +package eu.ztsh.lfr.config; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties("data") +public record DataProperties(String fileUrl) { + +} diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index e69de29..8008f0f 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -0,0 +1,2 @@ +data: + file-url: data/temperatures.csv From 12ca1369f3d91d177c90cc963367249e5abd698b Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Tue, 16 Jul 2024 23:39:47 +0200 Subject: [PATCH 06/19] feat: File watching service --- .../eu/ztsh/lfr/files/FileModifiedEvent.java | 5 ++ .../eu/ztsh/lfr/files/WatcherService.java | 76 +++++++++++++++++++ 2 files changed, 81 insertions(+) create mode 100644 src/main/java/eu/ztsh/lfr/files/FileModifiedEvent.java create mode 100644 src/main/java/eu/ztsh/lfr/files/WatcherService.java diff --git a/src/main/java/eu/ztsh/lfr/files/FileModifiedEvent.java b/src/main/java/eu/ztsh/lfr/files/FileModifiedEvent.java new file mode 100644 index 0000000..332b898 --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/files/FileModifiedEvent.java @@ -0,0 +1,5 @@ +package eu.ztsh.lfr.files; + +public record FileModifiedEvent(long timestamp) { + +} diff --git a/src/main/java/eu/ztsh/lfr/files/WatcherService.java b/src/main/java/eu/ztsh/lfr/files/WatcherService.java new file mode 100644 index 0000000..c021211 --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/files/WatcherService.java @@ -0,0 +1,76 @@ +package eu.ztsh.lfr.files; + +import eu.ztsh.lfr.config.DataProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Service; + +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Paths; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; + +@Service +public class WatcherService { + + private static final Logger log = LoggerFactory.getLogger(WatcherService.class); + + private final WatchService watchService; + private final ApplicationEventPublisher eventPublisher; + + private final String fileName; + + @Autowired + public WatcherService(DataProperties dataProperties, ApplicationEventPublisher eventPublisher) throws IOException { + this.eventPublisher = eventPublisher; + this.watchService = FileSystems.getDefault().newWatchService(); + + var filePath = Paths.get(dataProperties.fileUrl()); + fileName = filePath.getFileName().toString(); + filePath.getParent().register( + watchService, + // register file creation as some editors modify files in temp, and then replaces original file. + StandardWatchEventKinds.ENTRY_CREATE, + StandardWatchEventKinds.ENTRY_MODIFY + ); + } + + @SuppressWarnings("BusyWait") + @EventListener(ApplicationReadyEvent.class) + public void watch() throws InterruptedException { + WatchKey key; + while ((key = watchService.take()) != null) { + /* + When modifying in IntelliJ, one line edit ends with: + ENTRY_CREATE: temperatures.csv~ + ENTRY_MODIFY: temperatures.csv~ + ENTRY_MODIFY: temperatures.csv~ + ENTRY_MODIFY: temperatures.csv + ENTRY_MODIFY: temperatures.csv + ENTRY_MODIFY: temperatures.csv + Thread.sleep is used to filter out duplicated events. + origin: https://stackoverflow.com/a/25221600 + */ + Thread.sleep(50); + var maybeLastEvent = key.pollEvents().stream() + .filter(event -> event.context() != null) + .filter(event -> event.context().toString().endsWith(fileName)) + .reduce((o1, o2) -> o2); + if (maybeLastEvent.isPresent()) { + var lastEvent = maybeLastEvent.get(); + log.info("Got event of kind:{}", lastEvent.kind()); + eventPublisher.publishEvent(new FileModifiedEvent(System.currentTimeMillis())); + } else { + log.trace("Got event for not watched file"); + } + key.reset(); + } + } + +} From 625dd631370e4f47fd26e7960ce418646c4a0fa3 Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Wed, 17 Jul 2024 21:37:53 +0200 Subject: [PATCH 07/19] chore: Python test data generator --- util/test-generator.py | 100 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 util/test-generator.py diff --git a/util/test-generator.py b/util/test-generator.py new file mode 100644 index 0000000..517245c --- /dev/null +++ b/util/test-generator.py @@ -0,0 +1,100 @@ +""" +Create data in city;yyyy-mm-dd HH:mm:ss.SSS;temp format + +Avg line length: ~36 bytes +Expected output file size: >3 GB -> 3 000 000 000 bytes + +Needed lines count = ~83.(3)m + +66 district cities * 75 years with measurement twice per hour = 66*75*365*24*2 = 86 724 000 -> ~3.1 GB +""" + +import datetime +import numpy as np + +cities = [ + "Biała Podlaska", + "Białystok", + "Bielsko-Biała", + "Bydgoszcz", + "Bytom", + "Chełm", + "Chorzów", + "Częstochowa", + "Dąbrowa Górnicza", + "Elbląg", + "Gdańsk", + "Gdynia", + "Gliwice", + "Gorzów Wielkopolski", + "Grudziądz", + "Jastrzębie-Zdrój", + "Jaworzno", + "Jelenia Góra", + "Kalisz", + "Katowice", + "Kielce", + "Konin", + "Koszalin", + "Kraków", + "Krosno", + "Legnica", + "Leszno", + "Lublin", + "Łomża", + "Łódź", + "Mysłowice", + "Nowy Sącz", + "Olsztyn", + "Opole", + "Ostrołęka", + "Piekary Śląskie", + "Piotrków Trybunalski", + "Płock", + "Poznań", + "Przemyśl", + "Radom", + "Ruda Śląska", + "Rybnik", + "Rzeszów", + "Siedlce", + "Siemianowice Śląskie", + "Skierniewice", + "Słupsk", + "Sopot", + "Sosnowiec", + "Suwałki", + "Szczecin", + "Świętochłowice", + "Świnoujście", + "Tarnobrzeg", + "Tarnów", + "Toruń", + "Tychy", + "Wałbrzych", + "Włocławek", + "Wrocław", + "Zabrze", + "Zamość", + "Zielona Góra", + "Żory" +] + +begin_date = datetime.datetime(year=1949, month=1, day=1, hour=0, minute=0, second=0) +end_date = begin_date + datetime.timedelta(days=365 * 75) + + +generator = np.random.default_rng(790492283396) +batch = iter(generator.integers(low=-1500, high=3500, size=66*75*365*24*2)) + +start = datetime.datetime.now() +with open('../data/temperatures.csv', 'w', encoding='utf-8') as target: + for city in cities: + print(city) + now = begin_date + while now < end_date: + target.write("{};{}.000;{}\n".format(city, now, int(next(batch)) / 100.0)) + now += datetime.timedelta(minutes=30) + +end = datetime.datetime.now() +print("Completed in {}".format(end - start)) From f63604a18ab436428ab2d05900c27ced3f7d021a Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Thu, 18 Jul 2024 02:36:59 +0200 Subject: [PATCH 08/19] fix: Entry renamed to Average --- schema/response.json | 4 ++-- src/main/java/eu/ztsh/lfr/core/TemperaturesService.java | 4 ++-- .../java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java | 4 ++-- src/main/java/eu/ztsh/lfr/web/TemperaturesController.java | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/schema/response.json b/schema/response.json index 335cfe1..527a9e3 100644 --- a/schema/response.json +++ b/schema/response.json @@ -3,7 +3,7 @@ "$schema": "http://json-schema.org/draft/2020-12/schema", "type": "array", "def": { - "entry": { + "average": { "type": "object", "properties": { "year": { @@ -20,6 +20,6 @@ } }, "items": { - "$ref": "#/def/entry" + "$ref": "#/def/average" } } diff --git a/src/main/java/eu/ztsh/lfr/core/TemperaturesService.java b/src/main/java/eu/ztsh/lfr/core/TemperaturesService.java index 1ee66e3..70c1c3d 100644 --- a/src/main/java/eu/ztsh/lfr/core/TemperaturesService.java +++ b/src/main/java/eu/ztsh/lfr/core/TemperaturesService.java @@ -1,6 +1,6 @@ package eu.ztsh.lfr.core; -import eu.ztsh.lfr.model.Entry; +import eu.ztsh.lfr.model.Average; import jakarta.annotation.Nonnull; import java.util.List; @@ -8,6 +8,6 @@ import java.util.List; public interface TemperaturesService { @Nonnull - List getTemperaturesFor(String city); + List getTemperaturesFor(String city); } diff --git a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java index 79b2f27..8654b16 100644 --- a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java +++ b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java @@ -1,7 +1,7 @@ package eu.ztsh.lfr.core.impl; import eu.ztsh.lfr.core.TemperaturesService; -import eu.ztsh.lfr.model.Entry; +import eu.ztsh.lfr.model.Average; import jakarta.annotation.Nonnull; import org.springframework.stereotype.Service; @@ -12,7 +12,7 @@ public class TemperaturesServiceImpl implements TemperaturesService { @Nonnull @Override - public List getTemperaturesFor(String city) { + public List getTemperaturesFor(String city) { throw new UnsupportedOperationException("Not supported yet."); } diff --git a/src/main/java/eu/ztsh/lfr/web/TemperaturesController.java b/src/main/java/eu/ztsh/lfr/web/TemperaturesController.java index 52a2685..39b583d 100644 --- a/src/main/java/eu/ztsh/lfr/web/TemperaturesController.java +++ b/src/main/java/eu/ztsh/lfr/web/TemperaturesController.java @@ -1,7 +1,7 @@ package eu.ztsh.lfr.web; import eu.ztsh.lfr.core.TemperaturesService; -import eu.ztsh.lfr.model.Entry; +import eu.ztsh.lfr.model.Average; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; @@ -23,7 +23,7 @@ public class TemperaturesController { } @GetMapping("/{city}") - public ResponseEntity> getTemperatures(@PathVariable String city) { + public ResponseEntity> getTemperatures(@PathVariable String city) { var data = temperaturesService.getTemperaturesFor(city); return data.isEmpty() ? ResponseEntity.notFound().build() : ResponseEntity.ok(data); From f0b70968656b59e2bb314cf5a61cda1254efabd8 Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Thu, 18 Jul 2024 02:46:24 +0200 Subject: [PATCH 09/19] fix: Minor packages reorganization. --- .../java/eu/ztsh/lfr/{ => core/impl}/files/WatcherService.java | 3 ++- .../eu/ztsh/lfr/{files => model/events}/FileModifiedEvent.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) rename src/main/java/eu/ztsh/lfr/{ => core/impl}/files/WatcherService.java (97%) rename src/main/java/eu/ztsh/lfr/{files => model/events}/FileModifiedEvent.java (61%) diff --git a/src/main/java/eu/ztsh/lfr/files/WatcherService.java b/src/main/java/eu/ztsh/lfr/core/impl/files/WatcherService.java similarity index 97% rename from src/main/java/eu/ztsh/lfr/files/WatcherService.java rename to src/main/java/eu/ztsh/lfr/core/impl/files/WatcherService.java index c021211..a6030a9 100644 --- a/src/main/java/eu/ztsh/lfr/files/WatcherService.java +++ b/src/main/java/eu/ztsh/lfr/core/impl/files/WatcherService.java @@ -1,6 +1,7 @@ -package eu.ztsh.lfr.files; +package eu.ztsh.lfr.core.impl.files; import eu.ztsh.lfr.config.DataProperties; +import eu.ztsh.lfr.model.events.FileModifiedEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; diff --git a/src/main/java/eu/ztsh/lfr/files/FileModifiedEvent.java b/src/main/java/eu/ztsh/lfr/model/events/FileModifiedEvent.java similarity index 61% rename from src/main/java/eu/ztsh/lfr/files/FileModifiedEvent.java rename to src/main/java/eu/ztsh/lfr/model/events/FileModifiedEvent.java index 332b898..c49036b 100644 --- a/src/main/java/eu/ztsh/lfr/files/FileModifiedEvent.java +++ b/src/main/java/eu/ztsh/lfr/model/events/FileModifiedEvent.java @@ -1,4 +1,4 @@ -package eu.ztsh.lfr.files; +package eu.ztsh.lfr.model.events; public record FileModifiedEvent(long timestamp) { From 2025cca693c59db7820acaf03ba996251a63a94e Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Thu, 18 Jul 2024 02:51:18 +0200 Subject: [PATCH 10/19] feat: File processing. --- src/main/java/eu/ztsh/lfr/Main.java | 2 + .../core/impl/files/FileLoadingService.java | 66 +++++++++++++++++++ src/main/java/eu/ztsh/lfr/model/Averages.java | 13 ++++ src/main/java/eu/ztsh/lfr/model/DataRow.java | 5 ++ .../lfr/model/FileProcessingException.java | 9 +++ .../java/eu/ztsh/lfr/model/Temperatures.java | 33 ++++++++++ .../ztsh/lfr/model/YearDataAccumulator.java | 25 +++++++ .../lfr/model/events/FileProcessedEvent.java | 7 ++ util/change-event-creator.py | 2 + 9 files changed, 162 insertions(+) create mode 100644 src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java create mode 100644 src/main/java/eu/ztsh/lfr/model/Averages.java create mode 100644 src/main/java/eu/ztsh/lfr/model/DataRow.java create mode 100644 src/main/java/eu/ztsh/lfr/model/FileProcessingException.java create mode 100644 src/main/java/eu/ztsh/lfr/model/Temperatures.java create mode 100644 src/main/java/eu/ztsh/lfr/model/YearDataAccumulator.java create mode 100644 src/main/java/eu/ztsh/lfr/model/events/FileProcessedEvent.java create mode 100644 util/change-event-creator.py diff --git a/src/main/java/eu/ztsh/lfr/Main.java b/src/main/java/eu/ztsh/lfr/Main.java index 569eded..ca05716 100644 --- a/src/main/java/eu/ztsh/lfr/Main.java +++ b/src/main/java/eu/ztsh/lfr/Main.java @@ -4,10 +4,12 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.ConfigurationPropertiesScan; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.scheduling.annotation.EnableAsync; @SpringBootApplication @EnableConfigurationProperties @ConfigurationPropertiesScan +@EnableAsync public class Main { public static void main(String[] args) { diff --git a/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java b/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java new file mode 100644 index 0000000..844bdff --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java @@ -0,0 +1,66 @@ +package eu.ztsh.lfr.core.impl.files; + +import eu.ztsh.lfr.config.DataProperties; +import eu.ztsh.lfr.model.FileProcessingException; +import eu.ztsh.lfr.model.DataRow; +import eu.ztsh.lfr.model.events.FileModifiedEvent; +import eu.ztsh.lfr.model.Temperatures; +import eu.ztsh.lfr.model.events.FileProcessedEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Stream; + +@Service +public class FileLoadingService { + + private static final Logger log = LoggerFactory.getLogger(FileLoadingService.class); + private final DataProperties dataProperties; + private final ApplicationEventPublisher eventPublisher; + + + @Autowired + public FileLoadingService(DataProperties dataProperties, ApplicationEventPublisher eventPublisher) { + this.dataProperties = dataProperties; + this.eventPublisher = eventPublisher; + } + + @Async + @EventListener(FileModifiedEvent.class) + public void onFileModified() { + var start = System.currentTimeMillis(); + log.debug("Processing file"); + final Temperatures model = new Temperatures(); + try (final Stream lines = Files.lines(Paths.get(dataProperties.fileUrl()))) { + lines.parallel() + .filter(Predicate.not(String::isBlank)) + .map(line -> { + try { + var parts = line.split(";"); + return new DataRow(parts[0], Integer.parseInt(parts[1].split("-")[0]), Double.parseDouble(parts[2])); + } catch (Exception e) { + throw new FileProcessingException("Error in line %s".formatted(line), e); + } + }) + .forEach(model::addData); + } catch (IOException e) { + throw new FileProcessingException("File error", e); + } + var millis = System.currentTimeMillis() - start; + log.atInfo().setMessage("Elapsed time: {}:{}").addArgument(TimeUnit.MILLISECONDS.toMinutes(millis)) + .addArgument(() -> String.format("%02d", TimeUnit.MILLISECONDS.toSeconds(millis) % 60)) + .log(); + eventPublisher.publishEvent(new FileProcessedEvent(model.getAverages())); + } + +} diff --git a/src/main/java/eu/ztsh/lfr/model/Averages.java b/src/main/java/eu/ztsh/lfr/model/Averages.java new file mode 100644 index 0000000..118fe61 --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/model/Averages.java @@ -0,0 +1,13 @@ +package eu.ztsh.lfr.model; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class Averages extends HashMap> { + + public Averages(Map> m) { + super(m); + } + +} diff --git a/src/main/java/eu/ztsh/lfr/model/DataRow.java b/src/main/java/eu/ztsh/lfr/model/DataRow.java new file mode 100644 index 0000000..43ac596 --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/model/DataRow.java @@ -0,0 +1,5 @@ +package eu.ztsh.lfr.model; + +public record DataRow(String city, int year, double temperature) { + +} diff --git a/src/main/java/eu/ztsh/lfr/model/FileProcessingException.java b/src/main/java/eu/ztsh/lfr/model/FileProcessingException.java new file mode 100644 index 0000000..eb54b52 --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/model/FileProcessingException.java @@ -0,0 +1,9 @@ +package eu.ztsh.lfr.model; + +public class FileProcessingException extends RuntimeException { + + public FileProcessingException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/src/main/java/eu/ztsh/lfr/model/Temperatures.java b/src/main/java/eu/ztsh/lfr/model/Temperatures.java new file mode 100644 index 0000000..6916811 --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/model/Temperatures.java @@ -0,0 +1,33 @@ +package eu.ztsh.lfr.model; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; + +public class Temperatures extends ConcurrentHashMap> { + + public void addData(DataRow dataRow) { + putIfAbsent(dataRow.city(), new ConcurrentHashMap<>()); + var city = get(dataRow.city()); + city.putIfAbsent(dataRow.year(), new YearDataAccumulator()); + city.get(dataRow.year()).add(dataRow.temperature()); + } + + public Averages getAverages() { + var computed = entrySet().stream() + .map(entry -> { + var newEntry = entry.getValue().entrySet().stream() + .map(yearEntry -> { + var average = new Average(); + average.setYear(yearEntry.getKey().toString()); + average.setAverageTemperature(yearEntry.getValue().getAverage()); + return average; + }) + .toList(); + return Map.entry(entry.getKey(), newEntry); + }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + return new Averages(computed); + } + +} diff --git a/src/main/java/eu/ztsh/lfr/model/YearDataAccumulator.java b/src/main/java/eu/ztsh/lfr/model/YearDataAccumulator.java new file mode 100644 index 0000000..2b58474 --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/model/YearDataAccumulator.java @@ -0,0 +1,25 @@ +package eu.ztsh.lfr.model; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.DoubleAdder; + +public class YearDataAccumulator { + + private final DoubleAdder sum = new DoubleAdder(); + private final AtomicInteger counter = new AtomicInteger(); + + public void add(double value) { + synchronized (this) { + sum.add(value); + counter.incrementAndGet(); + } + } + + public BigDecimal getAverage() { + return BigDecimal.valueOf(sum.sum()).setScale(2, RoundingMode.HALF_UP) + .divide(BigDecimal.valueOf(counter.get()), RoundingMode.HALF_UP); + } + +} diff --git a/src/main/java/eu/ztsh/lfr/model/events/FileProcessedEvent.java b/src/main/java/eu/ztsh/lfr/model/events/FileProcessedEvent.java new file mode 100644 index 0000000..9e5ab8f --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/model/events/FileProcessedEvent.java @@ -0,0 +1,7 @@ +package eu.ztsh.lfr.model.events; + +import eu.ztsh.lfr.model.Averages; + +public record FileProcessedEvent(Averages averages) { + +} diff --git a/util/change-event-creator.py b/util/change-event-creator.py new file mode 100644 index 0000000..5512773 --- /dev/null +++ b/util/change-event-creator.py @@ -0,0 +1,2 @@ +with open('../data/temperatures.csv', 'a', encoding='utf-8') as target: + target.write("\n") From 141551c3d8d186554ac96475939ab78360a593e3 Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Thu, 18 Jul 2024 19:14:03 +0200 Subject: [PATCH 11/19] test: File processing service unit tests --- src/test/java/.gitkeep | 0 .../impl/files/FileLoadingServiceTest.java | 62 +++++++++++++++++++ src/test/resources/malformed.csv | 4 ++ src/test/resources/success.csv | 4 ++ 4 files changed, 70 insertions(+) delete mode 100644 src/test/java/.gitkeep create mode 100644 src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java create mode 100644 src/test/resources/malformed.csv create mode 100644 src/test/resources/success.csv diff --git a/src/test/java/.gitkeep b/src/test/java/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java b/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java new file mode 100644 index 0000000..d747f36 --- /dev/null +++ b/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java @@ -0,0 +1,62 @@ +package eu.ztsh.lfr.core.impl.files; + +import eu.ztsh.lfr.config.DataProperties; +import eu.ztsh.lfr.model.FileProcessingException; +import eu.ztsh.lfr.model.events.FileProcessedEvent; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.springframework.context.ApplicationEventPublisher; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class FileLoadingServiceTest { + + private final ApplicationEventPublisher eventPublisher = Mockito.mock(ApplicationEventPublisher.class); + + @Test + void successfulRunTest() { + var captor = ArgumentCaptor.forClass(FileProcessedEvent.class); + var service = createFileLoadingService("success.csv"); + assertThatNoException().isThrownBy(service::onFileModified); + Mockito.verify(eventPublisher, Mockito.times(1)) + .publishEvent(captor.capture()); + var averages = captor.getValue().averages(); + assertThat(averages).hasSize(1); + var list = averages.get("Warszawa"); + assertThat(list).hasSize(1); + assertThat(list.getFirst()).satisfies(item -> { + assertThat(item.getYear()).isEqualTo("2018"); + // 29.2775, but it is BigDecimal with scale 2 and round half up + assertThat(item.getAverageTemperature().doubleValue()).isEqualTo(29.28); + }); + } + + @Test + void notExistingFileTest() { + var service = createFileLoadingService("not-existing.csv"); + assertThatThrownBy(service::onFileModified) + .isInstanceOf(FileProcessingException.class) + .hasRootCauseInstanceOf(IOException.class); + Mockito.verifyNoInteractions(eventPublisher); + } + + @Test + void malformedFileTest() { + var service = createFileLoadingService("malformed.csv"); + assertThatThrownBy(service::onFileModified) + .isInstanceOf(FileProcessingException.class) + .hasMessage("Error in line Warszawa;2018-09-21 11:17:42.31833.27") + .hasRootCauseInstanceOf(ArrayIndexOutOfBoundsException.class); + Mockito.verifyNoInteractions(eventPublisher); + } + + private FileLoadingService createFileLoadingService(String path) { + return new FileLoadingService(new DataProperties("src/test/resources/" + path), eventPublisher); + } + +} diff --git a/src/test/resources/malformed.csv b/src/test/resources/malformed.csv new file mode 100644 index 0000000..b634989 --- /dev/null +++ b/src/test/resources/malformed.csv @@ -0,0 +1,4 @@ +Warszawa;2018-09-19 05:17:32.619;9.97 +Warszawa;2018-09-20 18:44:42.468;39.02 +Warszawa;2018-09-21 11:17:42.31833.27 +Warszawa;2018-09-22 19:25:07.568;34.85 diff --git a/src/test/resources/success.csv b/src/test/resources/success.csv new file mode 100644 index 0000000..7a4d8c5 --- /dev/null +++ b/src/test/resources/success.csv @@ -0,0 +1,4 @@ +Warszawa;2018-09-19 05:17:32.619;9.97 +Warszawa;2018-09-20 18:44:42.468;39.02 +Warszawa;2018-09-21 11:17:42.318;33.27 +Warszawa;2018-09-22 19:25:07.568;34.85 From bd452eff1786958b2a90d16e733e134a0f7a431a Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Thu, 18 Jul 2024 19:18:37 +0200 Subject: [PATCH 12/19] chore: Files formatting --- .../java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java | 5 ++--- .../eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java b/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java index 844bdff..274bd3a 100644 --- a/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java +++ b/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java @@ -1,10 +1,10 @@ package eu.ztsh.lfr.core.impl.files; import eu.ztsh.lfr.config.DataProperties; -import eu.ztsh.lfr.model.FileProcessingException; import eu.ztsh.lfr.model.DataRow; -import eu.ztsh.lfr.model.events.FileModifiedEvent; +import eu.ztsh.lfr.model.FileProcessingException; import eu.ztsh.lfr.model.Temperatures; +import eu.ztsh.lfr.model.events.FileModifiedEvent; import eu.ztsh.lfr.model.events.FileProcessedEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,7 +28,6 @@ public class FileLoadingService { private final DataProperties dataProperties; private final ApplicationEventPublisher eventPublisher; - @Autowired public FileLoadingService(DataProperties dataProperties, ApplicationEventPublisher eventPublisher) { this.dataProperties = dataProperties; diff --git a/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java b/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java index d747f36..9467de7 100644 --- a/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java +++ b/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java @@ -30,9 +30,9 @@ class FileLoadingServiceTest { var list = averages.get("Warszawa"); assertThat(list).hasSize(1); assertThat(list.getFirst()).satisfies(item -> { - assertThat(item.getYear()).isEqualTo("2018"); + assertThat(item.getYear()).isEqualTo("2018"); // 29.2775, but it is BigDecimal with scale 2 and round half up - assertThat(item.getAverageTemperature().doubleValue()).isEqualTo(29.28); + assertThat(item.getAverageTemperature().doubleValue()).isEqualTo(29.28); }); } From 1afff3819bee9dd7782136f8eeade3c21337329f Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Thu, 18 Jul 2024 19:27:26 +0200 Subject: [PATCH 13/19] feat: Averages update in Temperatures service --- .../ztsh/lfr/core/impl/TemperaturesServiceImpl.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java index 8654b16..11ebb03 100644 --- a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java +++ b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java @@ -2,7 +2,10 @@ package eu.ztsh.lfr.core.impl; import eu.ztsh.lfr.core.TemperaturesService; import eu.ztsh.lfr.model.Average; +import eu.ztsh.lfr.model.Averages; +import eu.ztsh.lfr.model.events.FileProcessedEvent; import jakarta.annotation.Nonnull; +import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; import java.util.List; @@ -10,10 +13,19 @@ import java.util.List; @Service public class TemperaturesServiceImpl implements TemperaturesService { + private Averages averages; + @Nonnull @Override public List getTemperaturesFor(String city) { throw new UnsupportedOperationException("Not supported yet."); } + @EventListener(FileProcessedEvent.class) + public void updateTemperatures(FileProcessedEvent event) { + synchronized (this) { + averages = event.averages(); + } + } + } From 82bf6b55cc4d11fc380b141b42f2b51ee0108cf6 Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Thu, 18 Jul 2024 21:15:06 +0200 Subject: [PATCH 14/19] fix: Initial state fix - fetch data and block executions until completed --- src/main/java/eu/ztsh/lfr/Main.java | 2 + .../core/impl/TemperaturesServiceImpl.java | 12 +++++- .../core/impl/files/FileLoadingService.java | 39 +++++++++++++++++-- .../lfr/core/impl/files/WatcherService.java | 2 +- .../lfr/model/events/FileModifiedEvent.java | 8 +++- .../impl/files/FileLoadingServiceTest.java | 18 +++++++-- 6 files changed, 72 insertions(+), 9 deletions(-) diff --git a/src/main/java/eu/ztsh/lfr/Main.java b/src/main/java/eu/ztsh/lfr/Main.java index ca05716..328214a 100644 --- a/src/main/java/eu/ztsh/lfr/Main.java +++ b/src/main/java/eu/ztsh/lfr/Main.java @@ -5,11 +5,13 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.ConfigurationPropertiesScan; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableConfigurationProperties @ConfigurationPropertiesScan @EnableAsync +@EnableScheduling public class Main { public static void main(String[] args) { diff --git a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java index 11ebb03..16cf39d 100644 --- a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java +++ b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java @@ -18,13 +18,23 @@ public class TemperaturesServiceImpl implements TemperaturesService { @Nonnull @Override public List getTemperaturesFor(String city) { - throw new UnsupportedOperationException("Not supported yet."); + synchronized (this) { + while (averages == null) { + try { + wait(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + return averages.get(city); + } } @EventListener(FileProcessedEvent.class) public void updateTemperatures(FileProcessedEvent event) { synchronized (this) { averages = event.averages(); + notifyAll(); } } diff --git a/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java b/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java index 274bd3a..31c124d 100644 --- a/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java +++ b/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java @@ -1,6 +1,7 @@ package eu.ztsh.lfr.core.impl.files; import eu.ztsh.lfr.config.DataProperties; +import eu.ztsh.lfr.model.Averages; import eu.ztsh.lfr.model.DataRow; import eu.ztsh.lfr.model.FileProcessingException; import eu.ztsh.lfr.model.Temperatures; @@ -9,15 +10,20 @@ import eu.ztsh.lfr.model.events.FileProcessedEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; import java.util.stream.Stream; @@ -28,6 +34,9 @@ public class FileLoadingService { private final DataProperties dataProperties; private final ApplicationEventPublisher eventPublisher; + private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + private final AtomicBoolean working = new AtomicBoolean(); + @Autowired public FileLoadingService(DataProperties dataProperties, ApplicationEventPublisher eventPublisher) { this.dataProperties = dataProperties; @@ -35,8 +44,32 @@ public class FileLoadingService { } @Async - @EventListener(FileModifiedEvent.class) - public void onFileModified() { + @EventListener({FileModifiedEvent.class, ApplicationReadyEvent.class}) + public void onFileModified(ApplicationEvent event) { + queue.add(event); + } + + @Scheduled(fixedRate=500) + void runTask() { + if (working.get()) { + return; + } + ApplicationEvent event; + log.debug("{} events queued", queue.size()); + do { + // process only last + event = queue.poll(); + } while (!queue.isEmpty()); + if (event != null) { + log.info("Processing event: {}", event.getClass().getSimpleName()); + working.set(true); + var result = processFile(); + eventPublisher.publishEvent(new FileProcessedEvent(result)); + working.set(false); + } + } + + private Averages processFile() { var start = System.currentTimeMillis(); log.debug("Processing file"); final Temperatures model = new Temperatures(); @@ -59,7 +92,7 @@ public class FileLoadingService { log.atInfo().setMessage("Elapsed time: {}:{}").addArgument(TimeUnit.MILLISECONDS.toMinutes(millis)) .addArgument(() -> String.format("%02d", TimeUnit.MILLISECONDS.toSeconds(millis) % 60)) .log(); - eventPublisher.publishEvent(new FileProcessedEvent(model.getAverages())); + return model.getAverages(); } } diff --git a/src/main/java/eu/ztsh/lfr/core/impl/files/WatcherService.java b/src/main/java/eu/ztsh/lfr/core/impl/files/WatcherService.java index a6030a9..febcbae 100644 --- a/src/main/java/eu/ztsh/lfr/core/impl/files/WatcherService.java +++ b/src/main/java/eu/ztsh/lfr/core/impl/files/WatcherService.java @@ -66,7 +66,7 @@ public class WatcherService { if (maybeLastEvent.isPresent()) { var lastEvent = maybeLastEvent.get(); log.info("Got event of kind:{}", lastEvent.kind()); - eventPublisher.publishEvent(new FileModifiedEvent(System.currentTimeMillis())); + eventPublisher.publishEvent(new FileModifiedEvent(this)); } else { log.trace("Got event for not watched file"); } diff --git a/src/main/java/eu/ztsh/lfr/model/events/FileModifiedEvent.java b/src/main/java/eu/ztsh/lfr/model/events/FileModifiedEvent.java index c49036b..19860b9 100644 --- a/src/main/java/eu/ztsh/lfr/model/events/FileModifiedEvent.java +++ b/src/main/java/eu/ztsh/lfr/model/events/FileModifiedEvent.java @@ -1,5 +1,11 @@ package eu.ztsh.lfr.model.events; -public record FileModifiedEvent(long timestamp) { +import org.springframework.context.ApplicationEvent; + +public class FileModifiedEvent extends ApplicationEvent { + + public FileModifiedEvent(Object source) { + super(source); + } } diff --git a/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java b/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java index 9467de7..8561e2f 100644 --- a/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java +++ b/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java @@ -6,6 +6,7 @@ import eu.ztsh.lfr.model.events.FileProcessedEvent; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEventPublisher; import java.io.IOException; @@ -22,7 +23,8 @@ class FileLoadingServiceTest { void successfulRunTest() { var captor = ArgumentCaptor.forClass(FileProcessedEvent.class); var service = createFileLoadingService("success.csv"); - assertThatNoException().isThrownBy(service::onFileModified); + service.onFileModified(new TestEvent()); + assertThatNoException().isThrownBy(service::runTask); Mockito.verify(eventPublisher, Mockito.times(1)) .publishEvent(captor.capture()); var averages = captor.getValue().averages(); @@ -39,7 +41,8 @@ class FileLoadingServiceTest { @Test void notExistingFileTest() { var service = createFileLoadingService("not-existing.csv"); - assertThatThrownBy(service::onFileModified) + service.onFileModified(new TestEvent()); + assertThatThrownBy(service::runTask) .isInstanceOf(FileProcessingException.class) .hasRootCauseInstanceOf(IOException.class); Mockito.verifyNoInteractions(eventPublisher); @@ -48,7 +51,8 @@ class FileLoadingServiceTest { @Test void malformedFileTest() { var service = createFileLoadingService("malformed.csv"); - assertThatThrownBy(service::onFileModified) + service.onFileModified(new TestEvent()); + assertThatThrownBy(service::runTask) .isInstanceOf(FileProcessingException.class) .hasMessage("Error in line Warszawa;2018-09-21 11:17:42.31833.27") .hasRootCauseInstanceOf(ArrayIndexOutOfBoundsException.class); @@ -59,4 +63,12 @@ class FileLoadingServiceTest { return new FileLoadingService(new DataProperties("src/test/resources/" + path), eventPublisher); } + private static class TestEvent extends ApplicationEvent { + + public TestEvent() { + super("TestEvent"); + } + + } + } From d5d51869df997eadfae003b7b0215d4076459128 Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Thu, 18 Jul 2024 22:55:15 +0200 Subject: [PATCH 15/19] fix: InterruptedException processing compliant with SonarLint --- .../java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java index 16cf39d..260744f 100644 --- a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java +++ b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java @@ -5,6 +5,8 @@ import eu.ztsh.lfr.model.Average; import eu.ztsh.lfr.model.Averages; import eu.ztsh.lfr.model.events.FileProcessedEvent; import jakarta.annotation.Nonnull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; @@ -13,6 +15,7 @@ import java.util.List; @Service public class TemperaturesServiceImpl implements TemperaturesService { + private static final Logger log = LoggerFactory.getLogger(TemperaturesServiceImpl.class); private Averages averages; @Nonnull @@ -23,7 +26,8 @@ public class TemperaturesServiceImpl implements TemperaturesService { try { wait(); } catch (InterruptedException e) { - throw new IllegalStateException(e); + log.error("Thread was interrupted", e); + Thread.currentThread().interrupt(); } } return averages.get(city); From 07d3e4a2a3d7e617f04e82c8cfbb552a7ac45f9a Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Fri, 19 Jul 2024 01:01:00 +0200 Subject: [PATCH 16/19] fix: Synchronization fixes in temperature service --- .../core/impl/TemperaturesServiceImpl.java | 29 +++++++++---------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java index 260744f..fa350fa 100644 --- a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java +++ b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java @@ -11,35 +11,32 @@ import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; @Service public class TemperaturesServiceImpl implements TemperaturesService { private static final Logger log = LoggerFactory.getLogger(TemperaturesServiceImpl.class); - private Averages averages; + private final AtomicReference averages = new AtomicReference<>(); @Nonnull @Override - public List getTemperaturesFor(String city) { - synchronized (this) { - while (averages == null) { - try { - wait(); - } catch (InterruptedException e) { - log.error("Thread was interrupted", e); - Thread.currentThread().interrupt(); - } + public synchronized List getTemperaturesFor(String city) { + while (averages.get() == null) { + try { + wait(); + } catch (InterruptedException e) { + log.error("Thread was interrupted", e); + Thread.currentThread().interrupt(); } - return averages.get(city); } + return averages.get().get(city); } @EventListener(FileProcessedEvent.class) - public void updateTemperatures(FileProcessedEvent event) { - synchronized (this) { - averages = event.averages(); - notifyAll(); - } + public synchronized void updateTemperatures(FileProcessedEvent event) { + averages.set(event.averages()); + notifyAll(); } } From 03fad8ff96a5a1e3eb5783012d16f47a2722293d Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Fri, 19 Jul 2024 01:11:03 +0200 Subject: [PATCH 17/19] fix: Map key comparison, preventing NPE --- .../java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java | 2 +- .../java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java index fa350fa..e3c608c 100644 --- a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java +++ b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java @@ -30,7 +30,7 @@ public class TemperaturesServiceImpl implements TemperaturesService { Thread.currentThread().interrupt(); } } - return averages.get().get(city); + return averages.get().getOrDefault(city.toLowerCase(), List.of()); } @EventListener(FileProcessedEvent.class) diff --git a/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java b/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java index 31c124d..500d03f 100644 --- a/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java +++ b/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java @@ -79,7 +79,9 @@ public class FileLoadingService { .map(line -> { try { var parts = line.split(";"); - return new DataRow(parts[0], Integer.parseInt(parts[1].split("-")[0]), Double.parseDouble(parts[2])); + return new DataRow(parts[0].toLowerCase(), + Integer.parseInt(parts[1].split("-")[0]), + Double.parseDouble(parts[2])); } catch (Exception e) { throw new FileProcessingException("Error in line %s".formatted(line), e); } From 03c8067796e8fca85697fa9323b39240de1468a7 Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Fri, 19 Jul 2024 01:20:38 +0200 Subject: [PATCH 18/19] feat!: Release 1.0.0 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 40be7ee..9c00eb4 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ eu.ztsh large-file-reading-challenge Large File Reading Challenge - 1.0.0-SNAPSHOT + 1.0.0 From 87d67652926f16f684d0bb6f7fd00f11aa5056d2 Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Fri, 19 Jul 2024 01:25:57 +0200 Subject: [PATCH 19/19] test: minor fix --- .../eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java b/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java index 8561e2f..86ca75c 100644 --- a/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java +++ b/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java @@ -29,7 +29,7 @@ class FileLoadingServiceTest { .publishEvent(captor.capture()); var averages = captor.getValue().averages(); assertThat(averages).hasSize(1); - var list = averages.get("Warszawa"); + var list = averages.get("warszawa"); assertThat(list).hasSize(1); assertThat(list.getFirst()).satisfies(item -> { assertThat(item.getYear()).isEqualTo("2018");