diff --git a/pom.xml b/pom.xml index 9c00eb4..40be7ee 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ eu.ztsh large-file-reading-challenge Large File Reading Challenge - 1.0.0 + 1.0.0-SNAPSHOT diff --git a/src/main/java/eu/ztsh/lfr/Main.java b/src/main/java/eu/ztsh/lfr/Main.java index 328214a..ca05716 100644 --- a/src/main/java/eu/ztsh/lfr/Main.java +++ b/src/main/java/eu/ztsh/lfr/Main.java @@ -5,13 +5,11 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.ConfigurationPropertiesScan; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.scheduling.annotation.EnableAsync; -import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableConfigurationProperties @ConfigurationPropertiesScan @EnableAsync -@EnableScheduling public class Main { public static void main(String[] args) { diff --git a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java index e3c608c..8654b16 100644 --- a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java +++ b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java @@ -2,41 +2,18 @@ package eu.ztsh.lfr.core.impl; import eu.ztsh.lfr.core.TemperaturesService; import eu.ztsh.lfr.model.Average; -import eu.ztsh.lfr.model.Averages; -import eu.ztsh.lfr.model.events.FileProcessedEvent; import jakarta.annotation.Nonnull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; @Service public class TemperaturesServiceImpl implements TemperaturesService { - private static final Logger log = LoggerFactory.getLogger(TemperaturesServiceImpl.class); - private final AtomicReference averages = new AtomicReference<>(); - @Nonnull @Override - public synchronized List getTemperaturesFor(String city) { - while (averages.get() == null) { - try { - wait(); - } catch (InterruptedException e) { - log.error("Thread was interrupted", e); - Thread.currentThread().interrupt(); - } - } - return averages.get().getOrDefault(city.toLowerCase(), List.of()); - } - - @EventListener(FileProcessedEvent.class) - public synchronized void updateTemperatures(FileProcessedEvent event) { - averages.set(event.averages()); - notifyAll(); + public List getTemperaturesFor(String city) { + throw new UnsupportedOperationException("Not supported yet."); } } diff --git a/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java b/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java index 500d03f..274bd3a 100644 --- a/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java +++ b/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java @@ -1,7 +1,6 @@ package eu.ztsh.lfr.core.impl.files; import eu.ztsh.lfr.config.DataProperties; -import eu.ztsh.lfr.model.Averages; import eu.ztsh.lfr.model.DataRow; import eu.ztsh.lfr.model.FileProcessingException; import eu.ztsh.lfr.model.Temperatures; @@ -10,20 +9,15 @@ import eu.ztsh.lfr.model.events.FileProcessedEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.context.event.ApplicationReadyEvent; -import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; import java.util.stream.Stream; @@ -34,9 +28,6 @@ public class FileLoadingService { private final DataProperties dataProperties; private final ApplicationEventPublisher eventPublisher; - private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); - private final AtomicBoolean working = new AtomicBoolean(); - @Autowired public FileLoadingService(DataProperties dataProperties, ApplicationEventPublisher eventPublisher) { this.dataProperties = dataProperties; @@ -44,32 +35,8 @@ public class FileLoadingService { } @Async - @EventListener({FileModifiedEvent.class, ApplicationReadyEvent.class}) - public void onFileModified(ApplicationEvent event) { - queue.add(event); - } - - @Scheduled(fixedRate=500) - void runTask() { - if (working.get()) { - return; - } - ApplicationEvent event; - log.debug("{} events queued", queue.size()); - do { - // process only last - event = queue.poll(); - } while (!queue.isEmpty()); - if (event != null) { - log.info("Processing event: {}", event.getClass().getSimpleName()); - working.set(true); - var result = processFile(); - eventPublisher.publishEvent(new FileProcessedEvent(result)); - working.set(false); - } - } - - private Averages processFile() { + @EventListener(FileModifiedEvent.class) + public void onFileModified() { var start = System.currentTimeMillis(); log.debug("Processing file"); final Temperatures model = new Temperatures(); @@ -79,9 +46,7 @@ public class FileLoadingService { .map(line -> { try { var parts = line.split(";"); - return new DataRow(parts[0].toLowerCase(), - Integer.parseInt(parts[1].split("-")[0]), - Double.parseDouble(parts[2])); + return new DataRow(parts[0], Integer.parseInt(parts[1].split("-")[0]), Double.parseDouble(parts[2])); } catch (Exception e) { throw new FileProcessingException("Error in line %s".formatted(line), e); } @@ -94,7 +59,7 @@ public class FileLoadingService { log.atInfo().setMessage("Elapsed time: {}:{}").addArgument(TimeUnit.MILLISECONDS.toMinutes(millis)) .addArgument(() -> String.format("%02d", TimeUnit.MILLISECONDS.toSeconds(millis) % 60)) .log(); - return model.getAverages(); + eventPublisher.publishEvent(new FileProcessedEvent(model.getAverages())); } } diff --git a/src/main/java/eu/ztsh/lfr/core/impl/files/WatcherService.java b/src/main/java/eu/ztsh/lfr/core/impl/files/WatcherService.java index febcbae..a6030a9 100644 --- a/src/main/java/eu/ztsh/lfr/core/impl/files/WatcherService.java +++ b/src/main/java/eu/ztsh/lfr/core/impl/files/WatcherService.java @@ -66,7 +66,7 @@ public class WatcherService { if (maybeLastEvent.isPresent()) { var lastEvent = maybeLastEvent.get(); log.info("Got event of kind:{}", lastEvent.kind()); - eventPublisher.publishEvent(new FileModifiedEvent(this)); + eventPublisher.publishEvent(new FileModifiedEvent(System.currentTimeMillis())); } else { log.trace("Got event for not watched file"); } diff --git a/src/main/java/eu/ztsh/lfr/model/events/FileModifiedEvent.java b/src/main/java/eu/ztsh/lfr/model/events/FileModifiedEvent.java index 19860b9..c49036b 100644 --- a/src/main/java/eu/ztsh/lfr/model/events/FileModifiedEvent.java +++ b/src/main/java/eu/ztsh/lfr/model/events/FileModifiedEvent.java @@ -1,11 +1,5 @@ package eu.ztsh.lfr.model.events; -import org.springframework.context.ApplicationEvent; - -public class FileModifiedEvent extends ApplicationEvent { - - public FileModifiedEvent(Object source) { - super(source); - } +public record FileModifiedEvent(long timestamp) { } diff --git a/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java b/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java index 86ca75c..9467de7 100644 --- a/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java +++ b/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java @@ -6,7 +6,6 @@ import eu.ztsh.lfr.model.events.FileProcessedEvent; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; -import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEventPublisher; import java.io.IOException; @@ -23,13 +22,12 @@ class FileLoadingServiceTest { void successfulRunTest() { var captor = ArgumentCaptor.forClass(FileProcessedEvent.class); var service = createFileLoadingService("success.csv"); - service.onFileModified(new TestEvent()); - assertThatNoException().isThrownBy(service::runTask); + assertThatNoException().isThrownBy(service::onFileModified); Mockito.verify(eventPublisher, Mockito.times(1)) .publishEvent(captor.capture()); var averages = captor.getValue().averages(); assertThat(averages).hasSize(1); - var list = averages.get("warszawa"); + var list = averages.get("Warszawa"); assertThat(list).hasSize(1); assertThat(list.getFirst()).satisfies(item -> { assertThat(item.getYear()).isEqualTo("2018"); @@ -41,8 +39,7 @@ class FileLoadingServiceTest { @Test void notExistingFileTest() { var service = createFileLoadingService("not-existing.csv"); - service.onFileModified(new TestEvent()); - assertThatThrownBy(service::runTask) + assertThatThrownBy(service::onFileModified) .isInstanceOf(FileProcessingException.class) .hasRootCauseInstanceOf(IOException.class); Mockito.verifyNoInteractions(eventPublisher); @@ -51,8 +48,7 @@ class FileLoadingServiceTest { @Test void malformedFileTest() { var service = createFileLoadingService("malformed.csv"); - service.onFileModified(new TestEvent()); - assertThatThrownBy(service::runTask) + assertThatThrownBy(service::onFileModified) .isInstanceOf(FileProcessingException.class) .hasMessage("Error in line Warszawa;2018-09-21 11:17:42.31833.27") .hasRootCauseInstanceOf(ArrayIndexOutOfBoundsException.class); @@ -63,12 +59,4 @@ class FileLoadingServiceTest { return new FileLoadingService(new DataProperties("src/test/resources/" + path), eventPublisher); } - private static class TestEvent extends ApplicationEvent { - - public TestEvent() { - super("TestEvent"); - } - - } - }