Compare commits

..

No commits in common. "master" and "feat/files" have entirely different histories.

7 changed files with 13 additions and 91 deletions

View file

@ -14,7 +14,7 @@
<groupId>eu.ztsh</groupId> <groupId>eu.ztsh</groupId>
<artifactId>large-file-reading-challenge</artifactId> <artifactId>large-file-reading-challenge</artifactId>
<name>Large File Reading Challenge</name> <name>Large File Reading Challenge</name>
<version>1.0.0</version> <version>1.0.0-SNAPSHOT</version>
<properties> <properties>
<!-- encoding --> <!-- encoding -->

View file

@ -5,13 +5,11 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationPropertiesScan; import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication @SpringBootApplication
@EnableConfigurationProperties @EnableConfigurationProperties
@ConfigurationPropertiesScan @ConfigurationPropertiesScan
@EnableAsync @EnableAsync
@EnableScheduling
public class Main { public class Main {
public static void main(String[] args) { public static void main(String[] args) {

View file

@ -2,41 +2,18 @@ package eu.ztsh.lfr.core.impl;
import eu.ztsh.lfr.core.TemperaturesService; import eu.ztsh.lfr.core.TemperaturesService;
import eu.ztsh.lfr.model.Average; import eu.ztsh.lfr.model.Average;
import eu.ztsh.lfr.model.Averages;
import eu.ztsh.lfr.model.events.FileProcessedEvent;
import jakarta.annotation.Nonnull; import jakarta.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
@Service @Service
public class TemperaturesServiceImpl implements TemperaturesService { public class TemperaturesServiceImpl implements TemperaturesService {
private static final Logger log = LoggerFactory.getLogger(TemperaturesServiceImpl.class);
private final AtomicReference<Averages> averages = new AtomicReference<>();
@Nonnull @Nonnull
@Override @Override
public synchronized List<Average> getTemperaturesFor(String city) { public List<Average> getTemperaturesFor(String city) {
while (averages.get() == null) { throw new UnsupportedOperationException("Not supported yet.");
try {
wait();
} catch (InterruptedException e) {
log.error("Thread was interrupted", e);
Thread.currentThread().interrupt();
}
}
return averages.get().getOrDefault(city.toLowerCase(), List.of());
}
@EventListener(FileProcessedEvent.class)
public synchronized void updateTemperatures(FileProcessedEvent event) {
averages.set(event.averages());
notifyAll();
} }
} }

View file

@ -1,7 +1,6 @@
package eu.ztsh.lfr.core.impl.files; package eu.ztsh.lfr.core.impl.files;
import eu.ztsh.lfr.config.DataProperties; import eu.ztsh.lfr.config.DataProperties;
import eu.ztsh.lfr.model.Averages;
import eu.ztsh.lfr.model.DataRow; import eu.ztsh.lfr.model.DataRow;
import eu.ztsh.lfr.model.FileProcessingException; import eu.ztsh.lfr.model.FileProcessingException;
import eu.ztsh.lfr.model.Temperatures; import eu.ztsh.lfr.model.Temperatures;
@ -10,20 +9,15 @@ import eu.ztsh.lfr.model.events.FileProcessedEvent;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener; import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -34,9 +28,6 @@ public class FileLoadingService {
private final DataProperties dataProperties; private final DataProperties dataProperties;
private final ApplicationEventPublisher eventPublisher; private final ApplicationEventPublisher eventPublisher;
private final ConcurrentLinkedQueue<ApplicationEvent> queue = new ConcurrentLinkedQueue<>();
private final AtomicBoolean working = new AtomicBoolean();
@Autowired @Autowired
public FileLoadingService(DataProperties dataProperties, ApplicationEventPublisher eventPublisher) { public FileLoadingService(DataProperties dataProperties, ApplicationEventPublisher eventPublisher) {
this.dataProperties = dataProperties; this.dataProperties = dataProperties;
@ -44,32 +35,8 @@ public class FileLoadingService {
} }
@Async @Async
@EventListener({FileModifiedEvent.class, ApplicationReadyEvent.class}) @EventListener(FileModifiedEvent.class)
public void onFileModified(ApplicationEvent event) { public void onFileModified() {
queue.add(event);
}
@Scheduled(fixedRate=500)
void runTask() {
if (working.get()) {
return;
}
ApplicationEvent event;
log.debug("{} events queued", queue.size());
do {
// process only last
event = queue.poll();
} while (!queue.isEmpty());
if (event != null) {
log.info("Processing event: {}", event.getClass().getSimpleName());
working.set(true);
var result = processFile();
eventPublisher.publishEvent(new FileProcessedEvent(result));
working.set(false);
}
}
private Averages processFile() {
var start = System.currentTimeMillis(); var start = System.currentTimeMillis();
log.debug("Processing file"); log.debug("Processing file");
final Temperatures model = new Temperatures(); final Temperatures model = new Temperatures();
@ -79,9 +46,7 @@ public class FileLoadingService {
.map(line -> { .map(line -> {
try { try {
var parts = line.split(";"); var parts = line.split(";");
return new DataRow(parts[0].toLowerCase(), return new DataRow(parts[0], Integer.parseInt(parts[1].split("-")[0]), Double.parseDouble(parts[2]));
Integer.parseInt(parts[1].split("-")[0]),
Double.parseDouble(parts[2]));
} catch (Exception e) { } catch (Exception e) {
throw new FileProcessingException("Error in line %s".formatted(line), e); throw new FileProcessingException("Error in line %s".formatted(line), e);
} }
@ -94,7 +59,7 @@ public class FileLoadingService {
log.atInfo().setMessage("Elapsed time: {}:{}").addArgument(TimeUnit.MILLISECONDS.toMinutes(millis)) log.atInfo().setMessage("Elapsed time: {}:{}").addArgument(TimeUnit.MILLISECONDS.toMinutes(millis))
.addArgument(() -> String.format("%02d", TimeUnit.MILLISECONDS.toSeconds(millis) % 60)) .addArgument(() -> String.format("%02d", TimeUnit.MILLISECONDS.toSeconds(millis) % 60))
.log(); .log();
return model.getAverages(); eventPublisher.publishEvent(new FileProcessedEvent(model.getAverages()));
} }
} }

View file

@ -66,7 +66,7 @@ public class WatcherService {
if (maybeLastEvent.isPresent()) { if (maybeLastEvent.isPresent()) {
var lastEvent = maybeLastEvent.get(); var lastEvent = maybeLastEvent.get();
log.info("Got event of kind:{}", lastEvent.kind()); log.info("Got event of kind:{}", lastEvent.kind());
eventPublisher.publishEvent(new FileModifiedEvent(this)); eventPublisher.publishEvent(new FileModifiedEvent(System.currentTimeMillis()));
} else { } else {
log.trace("Got event for not watched file"); log.trace("Got event for not watched file");
} }

View file

@ -1,11 +1,5 @@
package eu.ztsh.lfr.model.events; package eu.ztsh.lfr.model.events;
import org.springframework.context.ApplicationEvent; public record FileModifiedEvent(long timestamp) {
public class FileModifiedEvent extends ApplicationEvent {
public FileModifiedEvent(Object source) {
super(source);
}
} }

View file

@ -6,7 +6,6 @@ import eu.ztsh.lfr.model.events.FileProcessedEvent;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisher;
import java.io.IOException; import java.io.IOException;
@ -23,13 +22,12 @@ class FileLoadingServiceTest {
void successfulRunTest() { void successfulRunTest() {
var captor = ArgumentCaptor.forClass(FileProcessedEvent.class); var captor = ArgumentCaptor.forClass(FileProcessedEvent.class);
var service = createFileLoadingService("success.csv"); var service = createFileLoadingService("success.csv");
service.onFileModified(new TestEvent()); assertThatNoException().isThrownBy(service::onFileModified);
assertThatNoException().isThrownBy(service::runTask);
Mockito.verify(eventPublisher, Mockito.times(1)) Mockito.verify(eventPublisher, Mockito.times(1))
.publishEvent(captor.capture()); .publishEvent(captor.capture());
var averages = captor.getValue().averages(); var averages = captor.getValue().averages();
assertThat(averages).hasSize(1); assertThat(averages).hasSize(1);
var list = averages.get("warszawa"); var list = averages.get("Warszawa");
assertThat(list).hasSize(1); assertThat(list).hasSize(1);
assertThat(list.getFirst()).satisfies(item -> { assertThat(list.getFirst()).satisfies(item -> {
assertThat(item.getYear()).isEqualTo("2018"); assertThat(item.getYear()).isEqualTo("2018");
@ -41,8 +39,7 @@ class FileLoadingServiceTest {
@Test @Test
void notExistingFileTest() { void notExistingFileTest() {
var service = createFileLoadingService("not-existing.csv"); var service = createFileLoadingService("not-existing.csv");
service.onFileModified(new TestEvent()); assertThatThrownBy(service::onFileModified)
assertThatThrownBy(service::runTask)
.isInstanceOf(FileProcessingException.class) .isInstanceOf(FileProcessingException.class)
.hasRootCauseInstanceOf(IOException.class); .hasRootCauseInstanceOf(IOException.class);
Mockito.verifyNoInteractions(eventPublisher); Mockito.verifyNoInteractions(eventPublisher);
@ -51,8 +48,7 @@ class FileLoadingServiceTest {
@Test @Test
void malformedFileTest() { void malformedFileTest() {
var service = createFileLoadingService("malformed.csv"); var service = createFileLoadingService("malformed.csv");
service.onFileModified(new TestEvent()); assertThatThrownBy(service::onFileModified)
assertThatThrownBy(service::runTask)
.isInstanceOf(FileProcessingException.class) .isInstanceOf(FileProcessingException.class)
.hasMessage("Error in line Warszawa;2018-09-21 11:17:42.31833.27") .hasMessage("Error in line Warszawa;2018-09-21 11:17:42.31833.27")
.hasRootCauseInstanceOf(ArrayIndexOutOfBoundsException.class); .hasRootCauseInstanceOf(ArrayIndexOutOfBoundsException.class);
@ -63,12 +59,4 @@ class FileLoadingServiceTest {
return new FileLoadingService(new DataProperties("src/test/resources/" + path), eventPublisher); return new FileLoadingService(new DataProperties("src/test/resources/" + path), eventPublisher);
} }
private static class TestEvent extends ApplicationEvent {
public TestEvent() {
super("TestEvent");
}
}
} }