diff --git a/src/main/java/eu/ztsh/lfr/Main.java b/src/main/java/eu/ztsh/lfr/Main.java index ca05716..328214a 100644 --- a/src/main/java/eu/ztsh/lfr/Main.java +++ b/src/main/java/eu/ztsh/lfr/Main.java @@ -5,11 +5,13 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.ConfigurationPropertiesScan; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableConfigurationProperties @ConfigurationPropertiesScan @EnableAsync +@EnableScheduling public class Main { public static void main(String[] args) { diff --git a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java index 11ebb03..fa350fa 100644 --- a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java +++ b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java @@ -5,27 +5,38 @@ import eu.ztsh.lfr.model.Average; import eu.ztsh.lfr.model.Averages; import eu.ztsh.lfr.model.events.FileProcessedEvent; import jakarta.annotation.Nonnull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; @Service public class TemperaturesServiceImpl implements TemperaturesService { - private Averages averages; + private static final Logger log = LoggerFactory.getLogger(TemperaturesServiceImpl.class); + private final AtomicReference averages = new AtomicReference<>(); @Nonnull @Override - public List getTemperaturesFor(String city) { - throw new UnsupportedOperationException("Not supported yet."); + public synchronized List getTemperaturesFor(String city) { + while (averages.get() == null) { + try { + wait(); + } catch (InterruptedException e) { + log.error("Thread was interrupted", e); + Thread.currentThread().interrupt(); + } + } + return averages.get().get(city); } @EventListener(FileProcessedEvent.class) - public void updateTemperatures(FileProcessedEvent event) { - synchronized (this) { - averages = event.averages(); - } + public synchronized void updateTemperatures(FileProcessedEvent event) { + averages.set(event.averages()); + notifyAll(); } } diff --git a/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java b/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java index 274bd3a..31c124d 100644 --- a/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java +++ b/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java @@ -1,6 +1,7 @@ package eu.ztsh.lfr.core.impl.files; import eu.ztsh.lfr.config.DataProperties; +import eu.ztsh.lfr.model.Averages; import eu.ztsh.lfr.model.DataRow; import eu.ztsh.lfr.model.FileProcessingException; import eu.ztsh.lfr.model.Temperatures; @@ -9,15 +10,20 @@ import eu.ztsh.lfr.model.events.FileProcessedEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; import java.util.stream.Stream; @@ -28,6 +34,9 @@ public class FileLoadingService { private final DataProperties dataProperties; private final ApplicationEventPublisher eventPublisher; + private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + private final AtomicBoolean working = new AtomicBoolean(); + @Autowired public FileLoadingService(DataProperties dataProperties, ApplicationEventPublisher eventPublisher) { this.dataProperties = dataProperties; @@ -35,8 +44,32 @@ public class FileLoadingService { } @Async - @EventListener(FileModifiedEvent.class) - public void onFileModified() { + @EventListener({FileModifiedEvent.class, ApplicationReadyEvent.class}) + public void onFileModified(ApplicationEvent event) { + queue.add(event); + } + + @Scheduled(fixedRate=500) + void runTask() { + if (working.get()) { + return; + } + ApplicationEvent event; + log.debug("{} events queued", queue.size()); + do { + // process only last + event = queue.poll(); + } while (!queue.isEmpty()); + if (event != null) { + log.info("Processing event: {}", event.getClass().getSimpleName()); + working.set(true); + var result = processFile(); + eventPublisher.publishEvent(new FileProcessedEvent(result)); + working.set(false); + } + } + + private Averages processFile() { var start = System.currentTimeMillis(); log.debug("Processing file"); final Temperatures model = new Temperatures(); @@ -59,7 +92,7 @@ public class FileLoadingService { log.atInfo().setMessage("Elapsed time: {}:{}").addArgument(TimeUnit.MILLISECONDS.toMinutes(millis)) .addArgument(() -> String.format("%02d", TimeUnit.MILLISECONDS.toSeconds(millis) % 60)) .log(); - eventPublisher.publishEvent(new FileProcessedEvent(model.getAverages())); + return model.getAverages(); } } diff --git a/src/main/java/eu/ztsh/lfr/core/impl/files/WatcherService.java b/src/main/java/eu/ztsh/lfr/core/impl/files/WatcherService.java index a6030a9..febcbae 100644 --- a/src/main/java/eu/ztsh/lfr/core/impl/files/WatcherService.java +++ b/src/main/java/eu/ztsh/lfr/core/impl/files/WatcherService.java @@ -66,7 +66,7 @@ public class WatcherService { if (maybeLastEvent.isPresent()) { var lastEvent = maybeLastEvent.get(); log.info("Got event of kind:{}", lastEvent.kind()); - eventPublisher.publishEvent(new FileModifiedEvent(System.currentTimeMillis())); + eventPublisher.publishEvent(new FileModifiedEvent(this)); } else { log.trace("Got event for not watched file"); } diff --git a/src/main/java/eu/ztsh/lfr/model/events/FileModifiedEvent.java b/src/main/java/eu/ztsh/lfr/model/events/FileModifiedEvent.java index c49036b..19860b9 100644 --- a/src/main/java/eu/ztsh/lfr/model/events/FileModifiedEvent.java +++ b/src/main/java/eu/ztsh/lfr/model/events/FileModifiedEvent.java @@ -1,5 +1,11 @@ package eu.ztsh.lfr.model.events; -public record FileModifiedEvent(long timestamp) { +import org.springframework.context.ApplicationEvent; + +public class FileModifiedEvent extends ApplicationEvent { + + public FileModifiedEvent(Object source) { + super(source); + } } diff --git a/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java b/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java index 9467de7..8561e2f 100644 --- a/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java +++ b/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java @@ -6,6 +6,7 @@ import eu.ztsh.lfr.model.events.FileProcessedEvent; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEventPublisher; import java.io.IOException; @@ -22,7 +23,8 @@ class FileLoadingServiceTest { void successfulRunTest() { var captor = ArgumentCaptor.forClass(FileProcessedEvent.class); var service = createFileLoadingService("success.csv"); - assertThatNoException().isThrownBy(service::onFileModified); + service.onFileModified(new TestEvent()); + assertThatNoException().isThrownBy(service::runTask); Mockito.verify(eventPublisher, Mockito.times(1)) .publishEvent(captor.capture()); var averages = captor.getValue().averages(); @@ -39,7 +41,8 @@ class FileLoadingServiceTest { @Test void notExistingFileTest() { var service = createFileLoadingService("not-existing.csv"); - assertThatThrownBy(service::onFileModified) + service.onFileModified(new TestEvent()); + assertThatThrownBy(service::runTask) .isInstanceOf(FileProcessingException.class) .hasRootCauseInstanceOf(IOException.class); Mockito.verifyNoInteractions(eventPublisher); @@ -48,7 +51,8 @@ class FileLoadingServiceTest { @Test void malformedFileTest() { var service = createFileLoadingService("malformed.csv"); - assertThatThrownBy(service::onFileModified) + service.onFileModified(new TestEvent()); + assertThatThrownBy(service::runTask) .isInstanceOf(FileProcessingException.class) .hasMessage("Error in line Warszawa;2018-09-21 11:17:42.31833.27") .hasRootCauseInstanceOf(ArrayIndexOutOfBoundsException.class); @@ -59,4 +63,12 @@ class FileLoadingServiceTest { return new FileLoadingService(new DataProperties("src/test/resources/" + path), eventPublisher); } + private static class TestEvent extends ApplicationEvent { + + public TestEvent() { + super("TestEvent"); + } + + } + }