From 1afff3819bee9dd7782136f8eeade3c21337329f Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Thu, 18 Jul 2024 19:27:26 +0200 Subject: [PATCH 1/7] feat: Averages update in Temperatures service --- .../ztsh/lfr/core/impl/TemperaturesServiceImpl.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java index 8654b16..11ebb03 100644 --- a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java +++ b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java @@ -2,7 +2,10 @@ package eu.ztsh.lfr.core.impl; import eu.ztsh.lfr.core.TemperaturesService; import eu.ztsh.lfr.model.Average; +import eu.ztsh.lfr.model.Averages; +import eu.ztsh.lfr.model.events.FileProcessedEvent; import jakarta.annotation.Nonnull; +import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; import java.util.List; @@ -10,10 +13,19 @@ import java.util.List; @Service public class TemperaturesServiceImpl implements TemperaturesService { + private Averages averages; + @Nonnull @Override public List getTemperaturesFor(String city) { throw new UnsupportedOperationException("Not supported yet."); } + @EventListener(FileProcessedEvent.class) + public void updateTemperatures(FileProcessedEvent event) { + synchronized (this) { + averages = event.averages(); + } + } + } From 82bf6b55cc4d11fc380b141b42f2b51ee0108cf6 Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Thu, 18 Jul 2024 21:15:06 +0200 Subject: [PATCH 2/7] fix: Initial state fix - fetch data and block executions until completed --- src/main/java/eu/ztsh/lfr/Main.java | 2 + .../core/impl/TemperaturesServiceImpl.java | 12 +++++- .../core/impl/files/FileLoadingService.java | 39 +++++++++++++++++-- .../lfr/core/impl/files/WatcherService.java | 2 +- .../lfr/model/events/FileModifiedEvent.java | 8 +++- .../impl/files/FileLoadingServiceTest.java | 18 +++++++-- 6 files changed, 72 insertions(+), 9 deletions(-) diff --git a/src/main/java/eu/ztsh/lfr/Main.java b/src/main/java/eu/ztsh/lfr/Main.java index ca05716..328214a 100644 --- a/src/main/java/eu/ztsh/lfr/Main.java +++ b/src/main/java/eu/ztsh/lfr/Main.java @@ -5,11 +5,13 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.ConfigurationPropertiesScan; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableConfigurationProperties @ConfigurationPropertiesScan @EnableAsync +@EnableScheduling public class Main { public static void main(String[] args) { diff --git a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java index 11ebb03..16cf39d 100644 --- a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java +++ b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java @@ -18,13 +18,23 @@ public class TemperaturesServiceImpl implements TemperaturesService { @Nonnull @Override public List getTemperaturesFor(String city) { - throw new UnsupportedOperationException("Not supported yet."); + synchronized (this) { + while (averages == null) { + try { + wait(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + return averages.get(city); + } } @EventListener(FileProcessedEvent.class) public void updateTemperatures(FileProcessedEvent event) { synchronized (this) { averages = event.averages(); + notifyAll(); } } diff --git a/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java b/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java index 274bd3a..31c124d 100644 --- a/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java +++ b/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java @@ -1,6 +1,7 @@ package eu.ztsh.lfr.core.impl.files; import eu.ztsh.lfr.config.DataProperties; +import eu.ztsh.lfr.model.Averages; import eu.ztsh.lfr.model.DataRow; import eu.ztsh.lfr.model.FileProcessingException; import eu.ztsh.lfr.model.Temperatures; @@ -9,15 +10,20 @@ import eu.ztsh.lfr.model.events.FileProcessedEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; import java.util.stream.Stream; @@ -28,6 +34,9 @@ public class FileLoadingService { private final DataProperties dataProperties; private final ApplicationEventPublisher eventPublisher; + private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + private final AtomicBoolean working = new AtomicBoolean(); + @Autowired public FileLoadingService(DataProperties dataProperties, ApplicationEventPublisher eventPublisher) { this.dataProperties = dataProperties; @@ -35,8 +44,32 @@ public class FileLoadingService { } @Async - @EventListener(FileModifiedEvent.class) - public void onFileModified() { + @EventListener({FileModifiedEvent.class, ApplicationReadyEvent.class}) + public void onFileModified(ApplicationEvent event) { + queue.add(event); + } + + @Scheduled(fixedRate=500) + void runTask() { + if (working.get()) { + return; + } + ApplicationEvent event; + log.debug("{} events queued", queue.size()); + do { + // process only last + event = queue.poll(); + } while (!queue.isEmpty()); + if (event != null) { + log.info("Processing event: {}", event.getClass().getSimpleName()); + working.set(true); + var result = processFile(); + eventPublisher.publishEvent(new FileProcessedEvent(result)); + working.set(false); + } + } + + private Averages processFile() { var start = System.currentTimeMillis(); log.debug("Processing file"); final Temperatures model = new Temperatures(); @@ -59,7 +92,7 @@ public class FileLoadingService { log.atInfo().setMessage("Elapsed time: {}:{}").addArgument(TimeUnit.MILLISECONDS.toMinutes(millis)) .addArgument(() -> String.format("%02d", TimeUnit.MILLISECONDS.toSeconds(millis) % 60)) .log(); - eventPublisher.publishEvent(new FileProcessedEvent(model.getAverages())); + return model.getAverages(); } } diff --git a/src/main/java/eu/ztsh/lfr/core/impl/files/WatcherService.java b/src/main/java/eu/ztsh/lfr/core/impl/files/WatcherService.java index a6030a9..febcbae 100644 --- a/src/main/java/eu/ztsh/lfr/core/impl/files/WatcherService.java +++ b/src/main/java/eu/ztsh/lfr/core/impl/files/WatcherService.java @@ -66,7 +66,7 @@ public class WatcherService { if (maybeLastEvent.isPresent()) { var lastEvent = maybeLastEvent.get(); log.info("Got event of kind:{}", lastEvent.kind()); - eventPublisher.publishEvent(new FileModifiedEvent(System.currentTimeMillis())); + eventPublisher.publishEvent(new FileModifiedEvent(this)); } else { log.trace("Got event for not watched file"); } diff --git a/src/main/java/eu/ztsh/lfr/model/events/FileModifiedEvent.java b/src/main/java/eu/ztsh/lfr/model/events/FileModifiedEvent.java index c49036b..19860b9 100644 --- a/src/main/java/eu/ztsh/lfr/model/events/FileModifiedEvent.java +++ b/src/main/java/eu/ztsh/lfr/model/events/FileModifiedEvent.java @@ -1,5 +1,11 @@ package eu.ztsh.lfr.model.events; -public record FileModifiedEvent(long timestamp) { +import org.springframework.context.ApplicationEvent; + +public class FileModifiedEvent extends ApplicationEvent { + + public FileModifiedEvent(Object source) { + super(source); + } } diff --git a/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java b/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java index 9467de7..8561e2f 100644 --- a/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java +++ b/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java @@ -6,6 +6,7 @@ import eu.ztsh.lfr.model.events.FileProcessedEvent; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEventPublisher; import java.io.IOException; @@ -22,7 +23,8 @@ class FileLoadingServiceTest { void successfulRunTest() { var captor = ArgumentCaptor.forClass(FileProcessedEvent.class); var service = createFileLoadingService("success.csv"); - assertThatNoException().isThrownBy(service::onFileModified); + service.onFileModified(new TestEvent()); + assertThatNoException().isThrownBy(service::runTask); Mockito.verify(eventPublisher, Mockito.times(1)) .publishEvent(captor.capture()); var averages = captor.getValue().averages(); @@ -39,7 +41,8 @@ class FileLoadingServiceTest { @Test void notExistingFileTest() { var service = createFileLoadingService("not-existing.csv"); - assertThatThrownBy(service::onFileModified) + service.onFileModified(new TestEvent()); + assertThatThrownBy(service::runTask) .isInstanceOf(FileProcessingException.class) .hasRootCauseInstanceOf(IOException.class); Mockito.verifyNoInteractions(eventPublisher); @@ -48,7 +51,8 @@ class FileLoadingServiceTest { @Test void malformedFileTest() { var service = createFileLoadingService("malformed.csv"); - assertThatThrownBy(service::onFileModified) + service.onFileModified(new TestEvent()); + assertThatThrownBy(service::runTask) .isInstanceOf(FileProcessingException.class) .hasMessage("Error in line Warszawa;2018-09-21 11:17:42.31833.27") .hasRootCauseInstanceOf(ArrayIndexOutOfBoundsException.class); @@ -59,4 +63,12 @@ class FileLoadingServiceTest { return new FileLoadingService(new DataProperties("src/test/resources/" + path), eventPublisher); } + private static class TestEvent extends ApplicationEvent { + + public TestEvent() { + super("TestEvent"); + } + + } + } From d5d51869df997eadfae003b7b0215d4076459128 Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Thu, 18 Jul 2024 22:55:15 +0200 Subject: [PATCH 3/7] fix: InterruptedException processing compliant with SonarLint --- .../java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java index 16cf39d..260744f 100644 --- a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java +++ b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java @@ -5,6 +5,8 @@ import eu.ztsh.lfr.model.Average; import eu.ztsh.lfr.model.Averages; import eu.ztsh.lfr.model.events.FileProcessedEvent; import jakarta.annotation.Nonnull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; @@ -13,6 +15,7 @@ import java.util.List; @Service public class TemperaturesServiceImpl implements TemperaturesService { + private static final Logger log = LoggerFactory.getLogger(TemperaturesServiceImpl.class); private Averages averages; @Nonnull @@ -23,7 +26,8 @@ public class TemperaturesServiceImpl implements TemperaturesService { try { wait(); } catch (InterruptedException e) { - throw new IllegalStateException(e); + log.error("Thread was interrupted", e); + Thread.currentThread().interrupt(); } } return averages.get(city); From 07d3e4a2a3d7e617f04e82c8cfbb552a7ac45f9a Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Fri, 19 Jul 2024 01:01:00 +0200 Subject: [PATCH 4/7] fix: Synchronization fixes in temperature service --- .../core/impl/TemperaturesServiceImpl.java | 29 +++++++++---------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java index 260744f..fa350fa 100644 --- a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java +++ b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java @@ -11,35 +11,32 @@ import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; @Service public class TemperaturesServiceImpl implements TemperaturesService { private static final Logger log = LoggerFactory.getLogger(TemperaturesServiceImpl.class); - private Averages averages; + private final AtomicReference averages = new AtomicReference<>(); @Nonnull @Override - public List getTemperaturesFor(String city) { - synchronized (this) { - while (averages == null) { - try { - wait(); - } catch (InterruptedException e) { - log.error("Thread was interrupted", e); - Thread.currentThread().interrupt(); - } + public synchronized List getTemperaturesFor(String city) { + while (averages.get() == null) { + try { + wait(); + } catch (InterruptedException e) { + log.error("Thread was interrupted", e); + Thread.currentThread().interrupt(); } - return averages.get(city); } + return averages.get().get(city); } @EventListener(FileProcessedEvent.class) - public void updateTemperatures(FileProcessedEvent event) { - synchronized (this) { - averages = event.averages(); - notifyAll(); - } + public synchronized void updateTemperatures(FileProcessedEvent event) { + averages.set(event.averages()); + notifyAll(); } } From 03fad8ff96a5a1e3eb5783012d16f47a2722293d Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Fri, 19 Jul 2024 01:11:03 +0200 Subject: [PATCH 5/7] fix: Map key comparison, preventing NPE --- .../java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java | 2 +- .../java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java index fa350fa..e3c608c 100644 --- a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java +++ b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java @@ -30,7 +30,7 @@ public class TemperaturesServiceImpl implements TemperaturesService { Thread.currentThread().interrupt(); } } - return averages.get().get(city); + return averages.get().getOrDefault(city.toLowerCase(), List.of()); } @EventListener(FileProcessedEvent.class) diff --git a/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java b/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java index 31c124d..500d03f 100644 --- a/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java +++ b/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java @@ -79,7 +79,9 @@ public class FileLoadingService { .map(line -> { try { var parts = line.split(";"); - return new DataRow(parts[0], Integer.parseInt(parts[1].split("-")[0]), Double.parseDouble(parts[2])); + return new DataRow(parts[0].toLowerCase(), + Integer.parseInt(parts[1].split("-")[0]), + Double.parseDouble(parts[2])); } catch (Exception e) { throw new FileProcessingException("Error in line %s".formatted(line), e); } From 03c8067796e8fca85697fa9323b39240de1468a7 Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Fri, 19 Jul 2024 01:20:38 +0200 Subject: [PATCH 6/7] feat!: Release 1.0.0 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 40be7ee..9c00eb4 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ eu.ztsh large-file-reading-challenge Large File Reading Challenge - 1.0.0-SNAPSHOT + 1.0.0 From 87d67652926f16f684d0bb6f7fd00f11aa5056d2 Mon Sep 17 00:00:00 2001 From: Piotr Dec Date: Fri, 19 Jul 2024 01:25:57 +0200 Subject: [PATCH 7/7] test: minor fix --- .../eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java b/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java index 8561e2f..86ca75c 100644 --- a/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java +++ b/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java @@ -29,7 +29,7 @@ class FileLoadingServiceTest { .publishEvent(captor.capture()); var averages = captor.getValue().averages(); assertThat(averages).hasSize(1); - var list = averages.get("Warszawa"); + var list = averages.get("warszawa"); assertThat(list).hasSize(1); assertThat(list.getFirst()).satisfies(item -> { assertThat(item.getYear()).isEqualTo("2018");