diff --git a/.gitignore b/.gitignore index a471319..f9b4200 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,6 @@ target/ ### IntelliJ IDEA ### .idea/ *.iml + +# local files +data/ diff --git a/readme.md b/readme.md index 14ca349..b2c770f 100644 --- a/readme.md +++ b/readme.md @@ -1,7 +1,8 @@ # Large file reading challenge Welcome in the recruitment challenge. -Write an application that, at the endpoint specified by you, returns the yearly average temperatures for a given city in the format array of objects with the following fields: year, averageTemperature. +Write an application that, at the endpoint specified by you, returns the yearly average temperatures for a given city in +the format array of objects with the following fields: year, averageTemperature. ## Assumptions @@ -10,33 +11,40 @@ Write an application that, at the endpoint specified by you, returns the yearly - The content of the source file may change during the application's running ## Example source file -[example_file.csv](example_file.csv) +[example_file.csv](example_file.csv) ## Response ### Schema + [response_schema](schema/response.json) ### Example ```json [ - { - "year": "2021", - "averageTemperature": 12.1 - }, - { - "year": "2022", - "averageTemperature": 11.1 - }, - { - "year": "2023", - "averageTemperature": 14.1 - } + { + "year": "2021", + "averageTemperature": 12.1 + }, + { + "year": "2022", + "averageTemperature": 11.1 + }, + { + "year": "2023", + "averageTemperature": 14.1 + } ] ``` +## Configuration + +| property | description | default | +|---------------|--------------------------------|-----------------------| +| data.file-url | path to file with temperatures | data/temperatures.csv | + ## Usage Temperature statistics endpoint: `/api/temperatures/{city}` diff --git a/schema/response.json b/schema/response.json index 335cfe1..527a9e3 100644 --- a/schema/response.json +++ b/schema/response.json @@ -3,7 +3,7 @@ "$schema": "http://json-schema.org/draft/2020-12/schema", "type": "array", "def": { - "entry": { + "average": { "type": "object", "properties": { "year": { @@ -20,6 +20,6 @@ } }, "items": { - "$ref": "#/def/entry" + "$ref": "#/def/average" } } diff --git a/src/main/java/eu/ztsh/lfr/Main.java b/src/main/java/eu/ztsh/lfr/Main.java index 1f3f728..ca05716 100644 --- a/src/main/java/eu/ztsh/lfr/Main.java +++ b/src/main/java/eu/ztsh/lfr/Main.java @@ -2,8 +2,14 @@ package eu.ztsh.lfr; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.ConfigurationPropertiesScan; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.scheduling.annotation.EnableAsync; @SpringBootApplication +@EnableConfigurationProperties +@ConfigurationPropertiesScan +@EnableAsync public class Main { public static void main(String[] args) { diff --git a/src/main/java/eu/ztsh/lfr/config/DataProperties.java b/src/main/java/eu/ztsh/lfr/config/DataProperties.java new file mode 100644 index 0000000..152bc52 --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/config/DataProperties.java @@ -0,0 +1,8 @@ +package eu.ztsh.lfr.config; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties("data") +public record DataProperties(String fileUrl) { + +} diff --git a/src/main/java/eu/ztsh/lfr/core/TemperaturesService.java b/src/main/java/eu/ztsh/lfr/core/TemperaturesService.java index 1ee66e3..70c1c3d 100644 --- a/src/main/java/eu/ztsh/lfr/core/TemperaturesService.java +++ b/src/main/java/eu/ztsh/lfr/core/TemperaturesService.java @@ -1,6 +1,6 @@ package eu.ztsh.lfr.core; -import eu.ztsh.lfr.model.Entry; +import eu.ztsh.lfr.model.Average; import jakarta.annotation.Nonnull; import java.util.List; @@ -8,6 +8,6 @@ import java.util.List; public interface TemperaturesService { @Nonnull - List getTemperaturesFor(String city); + List getTemperaturesFor(String city); } diff --git a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java index 79b2f27..8654b16 100644 --- a/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java +++ b/src/main/java/eu/ztsh/lfr/core/impl/TemperaturesServiceImpl.java @@ -1,7 +1,7 @@ package eu.ztsh.lfr.core.impl; import eu.ztsh.lfr.core.TemperaturesService; -import eu.ztsh.lfr.model.Entry; +import eu.ztsh.lfr.model.Average; import jakarta.annotation.Nonnull; import org.springframework.stereotype.Service; @@ -12,7 +12,7 @@ public class TemperaturesServiceImpl implements TemperaturesService { @Nonnull @Override - public List getTemperaturesFor(String city) { + public List getTemperaturesFor(String city) { throw new UnsupportedOperationException("Not supported yet."); } diff --git a/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java b/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java new file mode 100644 index 0000000..274bd3a --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/core/impl/files/FileLoadingService.java @@ -0,0 +1,65 @@ +package eu.ztsh.lfr.core.impl.files; + +import eu.ztsh.lfr.config.DataProperties; +import eu.ztsh.lfr.model.DataRow; +import eu.ztsh.lfr.model.FileProcessingException; +import eu.ztsh.lfr.model.Temperatures; +import eu.ztsh.lfr.model.events.FileModifiedEvent; +import eu.ztsh.lfr.model.events.FileProcessedEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Stream; + +@Service +public class FileLoadingService { + + private static final Logger log = LoggerFactory.getLogger(FileLoadingService.class); + private final DataProperties dataProperties; + private final ApplicationEventPublisher eventPublisher; + + @Autowired + public FileLoadingService(DataProperties dataProperties, ApplicationEventPublisher eventPublisher) { + this.dataProperties = dataProperties; + this.eventPublisher = eventPublisher; + } + + @Async + @EventListener(FileModifiedEvent.class) + public void onFileModified() { + var start = System.currentTimeMillis(); + log.debug("Processing file"); + final Temperatures model = new Temperatures(); + try (final Stream lines = Files.lines(Paths.get(dataProperties.fileUrl()))) { + lines.parallel() + .filter(Predicate.not(String::isBlank)) + .map(line -> { + try { + var parts = line.split(";"); + return new DataRow(parts[0], Integer.parseInt(parts[1].split("-")[0]), Double.parseDouble(parts[2])); + } catch (Exception e) { + throw new FileProcessingException("Error in line %s".formatted(line), e); + } + }) + .forEach(model::addData); + } catch (IOException e) { + throw new FileProcessingException("File error", e); + } + var millis = System.currentTimeMillis() - start; + log.atInfo().setMessage("Elapsed time: {}:{}").addArgument(TimeUnit.MILLISECONDS.toMinutes(millis)) + .addArgument(() -> String.format("%02d", TimeUnit.MILLISECONDS.toSeconds(millis) % 60)) + .log(); + eventPublisher.publishEvent(new FileProcessedEvent(model.getAverages())); + } + +} diff --git a/src/main/java/eu/ztsh/lfr/core/impl/files/WatcherService.java b/src/main/java/eu/ztsh/lfr/core/impl/files/WatcherService.java new file mode 100644 index 0000000..a6030a9 --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/core/impl/files/WatcherService.java @@ -0,0 +1,77 @@ +package eu.ztsh.lfr.core.impl.files; + +import eu.ztsh.lfr.config.DataProperties; +import eu.ztsh.lfr.model.events.FileModifiedEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Service; + +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Paths; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; + +@Service +public class WatcherService { + + private static final Logger log = LoggerFactory.getLogger(WatcherService.class); + + private final WatchService watchService; + private final ApplicationEventPublisher eventPublisher; + + private final String fileName; + + @Autowired + public WatcherService(DataProperties dataProperties, ApplicationEventPublisher eventPublisher) throws IOException { + this.eventPublisher = eventPublisher; + this.watchService = FileSystems.getDefault().newWatchService(); + + var filePath = Paths.get(dataProperties.fileUrl()); + fileName = filePath.getFileName().toString(); + filePath.getParent().register( + watchService, + // register file creation as some editors modify files in temp, and then replaces original file. + StandardWatchEventKinds.ENTRY_CREATE, + StandardWatchEventKinds.ENTRY_MODIFY + ); + } + + @SuppressWarnings("BusyWait") + @EventListener(ApplicationReadyEvent.class) + public void watch() throws InterruptedException { + WatchKey key; + while ((key = watchService.take()) != null) { + /* + When modifying in IntelliJ, one line edit ends with: + ENTRY_CREATE: temperatures.csv~ + ENTRY_MODIFY: temperatures.csv~ + ENTRY_MODIFY: temperatures.csv~ + ENTRY_MODIFY: temperatures.csv + ENTRY_MODIFY: temperatures.csv + ENTRY_MODIFY: temperatures.csv + Thread.sleep is used to filter out duplicated events. + origin: https://stackoverflow.com/a/25221600 + */ + Thread.sleep(50); + var maybeLastEvent = key.pollEvents().stream() + .filter(event -> event.context() != null) + .filter(event -> event.context().toString().endsWith(fileName)) + .reduce((o1, o2) -> o2); + if (maybeLastEvent.isPresent()) { + var lastEvent = maybeLastEvent.get(); + log.info("Got event of kind:{}", lastEvent.kind()); + eventPublisher.publishEvent(new FileModifiedEvent(System.currentTimeMillis())); + } else { + log.trace("Got event for not watched file"); + } + key.reset(); + } + } + +} diff --git a/src/main/java/eu/ztsh/lfr/model/Averages.java b/src/main/java/eu/ztsh/lfr/model/Averages.java new file mode 100644 index 0000000..118fe61 --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/model/Averages.java @@ -0,0 +1,13 @@ +package eu.ztsh.lfr.model; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class Averages extends HashMap> { + + public Averages(Map> m) { + super(m); + } + +} diff --git a/src/main/java/eu/ztsh/lfr/model/DataRow.java b/src/main/java/eu/ztsh/lfr/model/DataRow.java new file mode 100644 index 0000000..43ac596 --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/model/DataRow.java @@ -0,0 +1,5 @@ +package eu.ztsh.lfr.model; + +public record DataRow(String city, int year, double temperature) { + +} diff --git a/src/main/java/eu/ztsh/lfr/model/FileProcessingException.java b/src/main/java/eu/ztsh/lfr/model/FileProcessingException.java new file mode 100644 index 0000000..eb54b52 --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/model/FileProcessingException.java @@ -0,0 +1,9 @@ +package eu.ztsh.lfr.model; + +public class FileProcessingException extends RuntimeException { + + public FileProcessingException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/src/main/java/eu/ztsh/lfr/model/Temperatures.java b/src/main/java/eu/ztsh/lfr/model/Temperatures.java new file mode 100644 index 0000000..6916811 --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/model/Temperatures.java @@ -0,0 +1,33 @@ +package eu.ztsh.lfr.model; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; + +public class Temperatures extends ConcurrentHashMap> { + + public void addData(DataRow dataRow) { + putIfAbsent(dataRow.city(), new ConcurrentHashMap<>()); + var city = get(dataRow.city()); + city.putIfAbsent(dataRow.year(), new YearDataAccumulator()); + city.get(dataRow.year()).add(dataRow.temperature()); + } + + public Averages getAverages() { + var computed = entrySet().stream() + .map(entry -> { + var newEntry = entry.getValue().entrySet().stream() + .map(yearEntry -> { + var average = new Average(); + average.setYear(yearEntry.getKey().toString()); + average.setAverageTemperature(yearEntry.getValue().getAverage()); + return average; + }) + .toList(); + return Map.entry(entry.getKey(), newEntry); + }).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + return new Averages(computed); + } + +} diff --git a/src/main/java/eu/ztsh/lfr/model/YearDataAccumulator.java b/src/main/java/eu/ztsh/lfr/model/YearDataAccumulator.java new file mode 100644 index 0000000..2b58474 --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/model/YearDataAccumulator.java @@ -0,0 +1,25 @@ +package eu.ztsh.lfr.model; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.DoubleAdder; + +public class YearDataAccumulator { + + private final DoubleAdder sum = new DoubleAdder(); + private final AtomicInteger counter = new AtomicInteger(); + + public void add(double value) { + synchronized (this) { + sum.add(value); + counter.incrementAndGet(); + } + } + + public BigDecimal getAverage() { + return BigDecimal.valueOf(sum.sum()).setScale(2, RoundingMode.HALF_UP) + .divide(BigDecimal.valueOf(counter.get()), RoundingMode.HALF_UP); + } + +} diff --git a/src/main/java/eu/ztsh/lfr/model/events/FileModifiedEvent.java b/src/main/java/eu/ztsh/lfr/model/events/FileModifiedEvent.java new file mode 100644 index 0000000..c49036b --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/model/events/FileModifiedEvent.java @@ -0,0 +1,5 @@ +package eu.ztsh.lfr.model.events; + +public record FileModifiedEvent(long timestamp) { + +} diff --git a/src/main/java/eu/ztsh/lfr/model/events/FileProcessedEvent.java b/src/main/java/eu/ztsh/lfr/model/events/FileProcessedEvent.java new file mode 100644 index 0000000..9e5ab8f --- /dev/null +++ b/src/main/java/eu/ztsh/lfr/model/events/FileProcessedEvent.java @@ -0,0 +1,7 @@ +package eu.ztsh.lfr.model.events; + +import eu.ztsh.lfr.model.Averages; + +public record FileProcessedEvent(Averages averages) { + +} diff --git a/src/main/java/eu/ztsh/lfr/web/TemperaturesController.java b/src/main/java/eu/ztsh/lfr/web/TemperaturesController.java index 52a2685..39b583d 100644 --- a/src/main/java/eu/ztsh/lfr/web/TemperaturesController.java +++ b/src/main/java/eu/ztsh/lfr/web/TemperaturesController.java @@ -1,7 +1,7 @@ package eu.ztsh.lfr.web; import eu.ztsh.lfr.core.TemperaturesService; -import eu.ztsh.lfr.model.Entry; +import eu.ztsh.lfr.model.Average; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; @@ -23,7 +23,7 @@ public class TemperaturesController { } @GetMapping("/{city}") - public ResponseEntity> getTemperatures(@PathVariable String city) { + public ResponseEntity> getTemperatures(@PathVariable String city) { var data = temperaturesService.getTemperaturesFor(city); return data.isEmpty() ? ResponseEntity.notFound().build() : ResponseEntity.ok(data); diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index e69de29..8008f0f 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -0,0 +1,2 @@ +data: + file-url: data/temperatures.csv diff --git a/src/test/java/.gitkeep b/src/test/java/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java b/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java new file mode 100644 index 0000000..9467de7 --- /dev/null +++ b/src/test/java/eu/ztsh/lfr/core/impl/files/FileLoadingServiceTest.java @@ -0,0 +1,62 @@ +package eu.ztsh.lfr.core.impl.files; + +import eu.ztsh.lfr.config.DataProperties; +import eu.ztsh.lfr.model.FileProcessingException; +import eu.ztsh.lfr.model.events.FileProcessedEvent; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.springframework.context.ApplicationEventPublisher; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +class FileLoadingServiceTest { + + private final ApplicationEventPublisher eventPublisher = Mockito.mock(ApplicationEventPublisher.class); + + @Test + void successfulRunTest() { + var captor = ArgumentCaptor.forClass(FileProcessedEvent.class); + var service = createFileLoadingService("success.csv"); + assertThatNoException().isThrownBy(service::onFileModified); + Mockito.verify(eventPublisher, Mockito.times(1)) + .publishEvent(captor.capture()); + var averages = captor.getValue().averages(); + assertThat(averages).hasSize(1); + var list = averages.get("Warszawa"); + assertThat(list).hasSize(1); + assertThat(list.getFirst()).satisfies(item -> { + assertThat(item.getYear()).isEqualTo("2018"); + // 29.2775, but it is BigDecimal with scale 2 and round half up + assertThat(item.getAverageTemperature().doubleValue()).isEqualTo(29.28); + }); + } + + @Test + void notExistingFileTest() { + var service = createFileLoadingService("not-existing.csv"); + assertThatThrownBy(service::onFileModified) + .isInstanceOf(FileProcessingException.class) + .hasRootCauseInstanceOf(IOException.class); + Mockito.verifyNoInteractions(eventPublisher); + } + + @Test + void malformedFileTest() { + var service = createFileLoadingService("malformed.csv"); + assertThatThrownBy(service::onFileModified) + .isInstanceOf(FileProcessingException.class) + .hasMessage("Error in line Warszawa;2018-09-21 11:17:42.31833.27") + .hasRootCauseInstanceOf(ArrayIndexOutOfBoundsException.class); + Mockito.verifyNoInteractions(eventPublisher); + } + + private FileLoadingService createFileLoadingService(String path) { + return new FileLoadingService(new DataProperties("src/test/resources/" + path), eventPublisher); + } + +} diff --git a/src/test/resources/malformed.csv b/src/test/resources/malformed.csv new file mode 100644 index 0000000..b634989 --- /dev/null +++ b/src/test/resources/malformed.csv @@ -0,0 +1,4 @@ +Warszawa;2018-09-19 05:17:32.619;9.97 +Warszawa;2018-09-20 18:44:42.468;39.02 +Warszawa;2018-09-21 11:17:42.31833.27 +Warszawa;2018-09-22 19:25:07.568;34.85 diff --git a/src/test/resources/success.csv b/src/test/resources/success.csv new file mode 100644 index 0000000..7a4d8c5 --- /dev/null +++ b/src/test/resources/success.csv @@ -0,0 +1,4 @@ +Warszawa;2018-09-19 05:17:32.619;9.97 +Warszawa;2018-09-20 18:44:42.468;39.02 +Warszawa;2018-09-21 11:17:42.318;33.27 +Warszawa;2018-09-22 19:25:07.568;34.85 diff --git a/util/change-event-creator.py b/util/change-event-creator.py new file mode 100644 index 0000000..5512773 --- /dev/null +++ b/util/change-event-creator.py @@ -0,0 +1,2 @@ +with open('../data/temperatures.csv', 'a', encoding='utf-8') as target: + target.write("\n") diff --git a/util/test-generator.py b/util/test-generator.py new file mode 100644 index 0000000..517245c --- /dev/null +++ b/util/test-generator.py @@ -0,0 +1,100 @@ +""" +Create data in city;yyyy-mm-dd HH:mm:ss.SSS;temp format + +Avg line length: ~36 bytes +Expected output file size: >3 GB -> 3 000 000 000 bytes + +Needed lines count = ~83.(3)m + +66 district cities * 75 years with measurement twice per hour = 66*75*365*24*2 = 86 724 000 -> ~3.1 GB +""" + +import datetime +import numpy as np + +cities = [ + "Biała Podlaska", + "Białystok", + "Bielsko-Biała", + "Bydgoszcz", + "Bytom", + "Chełm", + "Chorzów", + "Częstochowa", + "Dąbrowa Górnicza", + "Elbląg", + "Gdańsk", + "Gdynia", + "Gliwice", + "Gorzów Wielkopolski", + "Grudziądz", + "Jastrzębie-Zdrój", + "Jaworzno", + "Jelenia Góra", + "Kalisz", + "Katowice", + "Kielce", + "Konin", + "Koszalin", + "Kraków", + "Krosno", + "Legnica", + "Leszno", + "Lublin", + "Łomża", + "Łódź", + "Mysłowice", + "Nowy Sącz", + "Olsztyn", + "Opole", + "Ostrołęka", + "Piekary Śląskie", + "Piotrków Trybunalski", + "Płock", + "Poznań", + "Przemyśl", + "Radom", + "Ruda Śląska", + "Rybnik", + "Rzeszów", + "Siedlce", + "Siemianowice Śląskie", + "Skierniewice", + "Słupsk", + "Sopot", + "Sosnowiec", + "Suwałki", + "Szczecin", + "Świętochłowice", + "Świnoujście", + "Tarnobrzeg", + "Tarnów", + "Toruń", + "Tychy", + "Wałbrzych", + "Włocławek", + "Wrocław", + "Zabrze", + "Zamość", + "Zielona Góra", + "Żory" +] + +begin_date = datetime.datetime(year=1949, month=1, day=1, hour=0, minute=0, second=0) +end_date = begin_date + datetime.timedelta(days=365 * 75) + + +generator = np.random.default_rng(790492283396) +batch = iter(generator.integers(low=-1500, high=3500, size=66*75*365*24*2)) + +start = datetime.datetime.now() +with open('../data/temperatures.csv', 'w', encoding='utf-8') as target: + for city in cities: + print(city) + now = begin_date + while now < end_date: + target.write("{};{}.000;{}\n".format(city, now, int(next(batch)) / 100.0)) + now += datetime.timedelta(minutes=30) + +end = datetime.datetime.now() +print("Completed in {}".format(end - start))