Merge pull request 'feat: Files watching and processing services' from feat/files into dev

This commit is contained in:
Piotr Dec 2024-07-18 19:23:04 +02:00
commit 770ccf63a5
24 changed files with 460 additions and 22 deletions

3
.gitignore vendored
View file

@ -3,3 +3,6 @@ target/
### IntelliJ IDEA ###
.idea/
*.iml
# local files
data/

View file

@ -1,7 +1,8 @@
# Large file reading challenge
Welcome in the recruitment challenge.
Write an application that, at the endpoint specified by you, returns the yearly average temperatures for a given city in the format array of objects with the following fields: year, averageTemperature.
Write an application that, at the endpoint specified by you, returns the yearly average temperatures for a given city in
the format array of objects with the following fields: year, averageTemperature.
## Assumptions
@ -10,12 +11,13 @@ Write an application that, at the endpoint specified by you, returns the yearly
- The content of the source file may change during the application's running
## Example source file
[example_file.csv](example_file.csv)
[example_file.csv](example_file.csv)
## Response
### Schema
[response_schema](schema/response.json)
### Example
@ -37,6 +39,12 @@ Write an application that, at the endpoint specified by you, returns the yearly
]
```
## Configuration
| property | description | default |
|---------------|--------------------------------|-----------------------|
| data.file-url | path to file with temperatures | data/temperatures.csv |
## Usage
Temperature statistics endpoint: `/api/temperatures/{city}`

View file

@ -3,7 +3,7 @@
"$schema": "http://json-schema.org/draft/2020-12/schema",
"type": "array",
"def": {
"entry": {
"average": {
"type": "object",
"properties": {
"year": {
@ -20,6 +20,6 @@
}
},
"items": {
"$ref": "#/def/entry"
"$ref": "#/def/average"
}
}

View file

@ -2,8 +2,14 @@ package eu.ztsh.lfr;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.scheduling.annotation.EnableAsync;
@SpringBootApplication
@EnableConfigurationProperties
@ConfigurationPropertiesScan
@EnableAsync
public class Main {
public static void main(String[] args) {

View file

@ -0,0 +1,8 @@
package eu.ztsh.lfr.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties("data")
public record DataProperties(String fileUrl) {
}

View file

@ -1,6 +1,6 @@
package eu.ztsh.lfr.core;
import eu.ztsh.lfr.model.Entry;
import eu.ztsh.lfr.model.Average;
import jakarta.annotation.Nonnull;
import java.util.List;
@ -8,6 +8,6 @@ import java.util.List;
public interface TemperaturesService {
@Nonnull
List<Entry> getTemperaturesFor(String city);
List<Average> getTemperaturesFor(String city);
}

View file

@ -1,7 +1,7 @@
package eu.ztsh.lfr.core.impl;
import eu.ztsh.lfr.core.TemperaturesService;
import eu.ztsh.lfr.model.Entry;
import eu.ztsh.lfr.model.Average;
import jakarta.annotation.Nonnull;
import org.springframework.stereotype.Service;
@ -12,7 +12,7 @@ public class TemperaturesServiceImpl implements TemperaturesService {
@Nonnull
@Override
public List<Entry> getTemperaturesFor(String city) {
public List<Average> getTemperaturesFor(String city) {
throw new UnsupportedOperationException("Not supported yet.");
}

View file

@ -0,0 +1,65 @@
package eu.ztsh.lfr.core.impl.files;
import eu.ztsh.lfr.config.DataProperties;
import eu.ztsh.lfr.model.DataRow;
import eu.ztsh.lfr.model.FileProcessingException;
import eu.ztsh.lfr.model.Temperatures;
import eu.ztsh.lfr.model.events.FileModifiedEvent;
import eu.ztsh.lfr.model.events.FileProcessedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Stream;
@Service
public class FileLoadingService {
private static final Logger log = LoggerFactory.getLogger(FileLoadingService.class);
private final DataProperties dataProperties;
private final ApplicationEventPublisher eventPublisher;
@Autowired
public FileLoadingService(DataProperties dataProperties, ApplicationEventPublisher eventPublisher) {
this.dataProperties = dataProperties;
this.eventPublisher = eventPublisher;
}
@Async
@EventListener(FileModifiedEvent.class)
public void onFileModified() {
var start = System.currentTimeMillis();
log.debug("Processing file");
final Temperatures model = new Temperatures();
try (final Stream<String> lines = Files.lines(Paths.get(dataProperties.fileUrl()))) {
lines.parallel()
.filter(Predicate.not(String::isBlank))
.map(line -> {
try {
var parts = line.split(";");
return new DataRow(parts[0], Integer.parseInt(parts[1].split("-")[0]), Double.parseDouble(parts[2]));
} catch (Exception e) {
throw new FileProcessingException("Error in line %s".formatted(line), e);
}
})
.forEach(model::addData);
} catch (IOException e) {
throw new FileProcessingException("File error", e);
}
var millis = System.currentTimeMillis() - start;
log.atInfo().setMessage("Elapsed time: {}:{}").addArgument(TimeUnit.MILLISECONDS.toMinutes(millis))
.addArgument(() -> String.format("%02d", TimeUnit.MILLISECONDS.toSeconds(millis) % 60))
.log();
eventPublisher.publishEvent(new FileProcessedEvent(model.getAverages()));
}
}

View file

@ -0,0 +1,77 @@
package eu.ztsh.lfr.core.impl.files;
import eu.ztsh.lfr.config.DataProperties;
import eu.ztsh.lfr.model.events.FileModifiedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
@Service
public class WatcherService {
private static final Logger log = LoggerFactory.getLogger(WatcherService.class);
private final WatchService watchService;
private final ApplicationEventPublisher eventPublisher;
private final String fileName;
@Autowired
public WatcherService(DataProperties dataProperties, ApplicationEventPublisher eventPublisher) throws IOException {
this.eventPublisher = eventPublisher;
this.watchService = FileSystems.getDefault().newWatchService();
var filePath = Paths.get(dataProperties.fileUrl());
fileName = filePath.getFileName().toString();
filePath.getParent().register(
watchService,
// register file creation as some editors modify files in temp, and then replaces original file.
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_MODIFY
);
}
@SuppressWarnings("BusyWait")
@EventListener(ApplicationReadyEvent.class)
public void watch() throws InterruptedException {
WatchKey key;
while ((key = watchService.take()) != null) {
/*
When modifying in IntelliJ, one line edit ends with:
ENTRY_CREATE: temperatures.csv~
ENTRY_MODIFY: temperatures.csv~
ENTRY_MODIFY: temperatures.csv~
ENTRY_MODIFY: temperatures.csv
ENTRY_MODIFY: temperatures.csv
ENTRY_MODIFY: temperatures.csv
Thread.sleep is used to filter out duplicated events.
origin: https://stackoverflow.com/a/25221600
*/
Thread.sleep(50);
var maybeLastEvent = key.pollEvents().stream()
.filter(event -> event.context() != null)
.filter(event -> event.context().toString().endsWith(fileName))
.reduce((o1, o2) -> o2);
if (maybeLastEvent.isPresent()) {
var lastEvent = maybeLastEvent.get();
log.info("Got event of kind:{}", lastEvent.kind());
eventPublisher.publishEvent(new FileModifiedEvent(System.currentTimeMillis()));
} else {
log.trace("Got event for not watched file");
}
key.reset();
}
}
}

View file

@ -0,0 +1,13 @@
package eu.ztsh.lfr.model;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class Averages extends HashMap<String, List<Average>> {
public Averages(Map<String, List<Average>> m) {
super(m);
}
}

View file

@ -0,0 +1,5 @@
package eu.ztsh.lfr.model;
public record DataRow(String city, int year, double temperature) {
}

View file

@ -0,0 +1,9 @@
package eu.ztsh.lfr.model;
public class FileProcessingException extends RuntimeException {
public FileProcessingException(String message, Throwable cause) {
super(message, cause);
}
}

View file

@ -0,0 +1,33 @@
package eu.ztsh.lfr.model;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
public class Temperatures extends ConcurrentHashMap<String, ConcurrentMap<Integer, YearDataAccumulator>> {
public void addData(DataRow dataRow) {
putIfAbsent(dataRow.city(), new ConcurrentHashMap<>());
var city = get(dataRow.city());
city.putIfAbsent(dataRow.year(), new YearDataAccumulator());
city.get(dataRow.year()).add(dataRow.temperature());
}
public Averages getAverages() {
var computed = entrySet().stream()
.map(entry -> {
var newEntry = entry.getValue().entrySet().stream()
.map(yearEntry -> {
var average = new Average();
average.setYear(yearEntry.getKey().toString());
average.setAverageTemperature(yearEntry.getValue().getAverage());
return average;
})
.toList();
return Map.entry(entry.getKey(), newEntry);
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return new Averages(computed);
}
}

View file

@ -0,0 +1,25 @@
package eu.ztsh.lfr.model;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.DoubleAdder;
public class YearDataAccumulator {
private final DoubleAdder sum = new DoubleAdder();
private final AtomicInteger counter = new AtomicInteger();
public void add(double value) {
synchronized (this) {
sum.add(value);
counter.incrementAndGet();
}
}
public BigDecimal getAverage() {
return BigDecimal.valueOf(sum.sum()).setScale(2, RoundingMode.HALF_UP)
.divide(BigDecimal.valueOf(counter.get()), RoundingMode.HALF_UP);
}
}

View file

@ -0,0 +1,5 @@
package eu.ztsh.lfr.model.events;
public record FileModifiedEvent(long timestamp) {
}

View file

@ -0,0 +1,7 @@
package eu.ztsh.lfr.model.events;
import eu.ztsh.lfr.model.Averages;
public record FileProcessedEvent(Averages averages) {
}

View file

@ -1,7 +1,7 @@
package eu.ztsh.lfr.web;
import eu.ztsh.lfr.core.TemperaturesService;
import eu.ztsh.lfr.model.Entry;
import eu.ztsh.lfr.model.Average;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
@ -23,7 +23,7 @@ public class TemperaturesController {
}
@GetMapping("/{city}")
public ResponseEntity<List<Entry>> getTemperatures(@PathVariable String city) {
public ResponseEntity<List<Average>> getTemperatures(@PathVariable String city) {
var data = temperaturesService.getTemperaturesFor(city);
return data.isEmpty() ? ResponseEntity.notFound().build() : ResponseEntity.ok(data);

View file

@ -0,0 +1,2 @@
data:
file-url: data/temperatures.csv

View file

View file

@ -0,0 +1,62 @@
package eu.ztsh.lfr.core.impl.files;
import eu.ztsh.lfr.config.DataProperties;
import eu.ztsh.lfr.model.FileProcessingException;
import eu.ztsh.lfr.model.events.FileProcessedEvent;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.springframework.context.ApplicationEventPublisher;
import java.io.IOException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
class FileLoadingServiceTest {
private final ApplicationEventPublisher eventPublisher = Mockito.mock(ApplicationEventPublisher.class);
@Test
void successfulRunTest() {
var captor = ArgumentCaptor.forClass(FileProcessedEvent.class);
var service = createFileLoadingService("success.csv");
assertThatNoException().isThrownBy(service::onFileModified);
Mockito.verify(eventPublisher, Mockito.times(1))
.publishEvent(captor.capture());
var averages = captor.getValue().averages();
assertThat(averages).hasSize(1);
var list = averages.get("Warszawa");
assertThat(list).hasSize(1);
assertThat(list.getFirst()).satisfies(item -> {
assertThat(item.getYear()).isEqualTo("2018");
// 29.2775, but it is BigDecimal with scale 2 and round half up
assertThat(item.getAverageTemperature().doubleValue()).isEqualTo(29.28);
});
}
@Test
void notExistingFileTest() {
var service = createFileLoadingService("not-existing.csv");
assertThatThrownBy(service::onFileModified)
.isInstanceOf(FileProcessingException.class)
.hasRootCauseInstanceOf(IOException.class);
Mockito.verifyNoInteractions(eventPublisher);
}
@Test
void malformedFileTest() {
var service = createFileLoadingService("malformed.csv");
assertThatThrownBy(service::onFileModified)
.isInstanceOf(FileProcessingException.class)
.hasMessage("Error in line Warszawa;2018-09-21 11:17:42.31833.27")
.hasRootCauseInstanceOf(ArrayIndexOutOfBoundsException.class);
Mockito.verifyNoInteractions(eventPublisher);
}
private FileLoadingService createFileLoadingService(String path) {
return new FileLoadingService(new DataProperties("src/test/resources/" + path), eventPublisher);
}
}

View file

@ -0,0 +1,4 @@
Warszawa;2018-09-19 05:17:32.619;9.97
Warszawa;2018-09-20 18:44:42.468;39.02
Warszawa;2018-09-21 11:17:42.31833.27
Warszawa;2018-09-22 19:25:07.568;34.85
1 Warszawa;2018-09-19 05:17:32.619;9.97
2 Warszawa;2018-09-20 18:44:42.468;39.02
3 Warszawa;2018-09-21 11:17:42.31833.27
4 Warszawa;2018-09-22 19:25:07.568;34.85

View file

@ -0,0 +1,4 @@
Warszawa;2018-09-19 05:17:32.619;9.97
Warszawa;2018-09-20 18:44:42.468;39.02
Warszawa;2018-09-21 11:17:42.318;33.27
Warszawa;2018-09-22 19:25:07.568;34.85
1 Warszawa 2018-09-19 05:17:32.619 9.97
2 Warszawa 2018-09-20 18:44:42.468 39.02
3 Warszawa 2018-09-21 11:17:42.318 33.27
4 Warszawa 2018-09-22 19:25:07.568 34.85

View file

@ -0,0 +1,2 @@
with open('../data/temperatures.csv', 'a', encoding='utf-8') as target:
target.write("\n")

100
util/test-generator.py Normal file
View file

@ -0,0 +1,100 @@
"""
Create data in city;yyyy-mm-dd HH:mm:ss.SSS;temp format
Avg line length: ~36 bytes
Expected output file size: >3 GB -> 3 000 000 000 bytes
Needed lines count = ~83.(3)m
66 district cities * 75 years with measurement twice per hour = 66*75*365*24*2 = 86 724 000 -> ~3.1 GB
"""
import datetime
import numpy as np
cities = [
"Biała Podlaska",
"Białystok",
"Bielsko-Biała",
"Bydgoszcz",
"Bytom",
"Chełm",
"Chorzów",
"Częstochowa",
"Dąbrowa Górnicza",
"Elbląg",
"Gdańsk",
"Gdynia",
"Gliwice",
"Gorzów Wielkopolski",
"Grudziądz",
"Jastrzębie-Zdrój",
"Jaworzno",
"Jelenia Góra",
"Kalisz",
"Katowice",
"Kielce",
"Konin",
"Koszalin",
"Kraków",
"Krosno",
"Legnica",
"Leszno",
"Lublin",
"Łomża",
"Łódź",
"Mysłowice",
"Nowy Sącz",
"Olsztyn",
"Opole",
"Ostrołęka",
"Piekary Śląskie",
"Piotrków Trybunalski",
"Płock",
"Poznań",
"Przemyśl",
"Radom",
"Ruda Śląska",
"Rybnik",
"Rzeszów",
"Siedlce",
"Siemianowice Śląskie",
"Skierniewice",
"Słupsk",
"Sopot",
"Sosnowiec",
"Suwałki",
"Szczecin",
"Świętochłowice",
"Świnoujście",
"Tarnobrzeg",
"Tarnów",
"Toruń",
"Tychy",
"Wałbrzych",
"Włocławek",
"Wrocław",
"Zabrze",
"Zamość",
"Zielona Góra",
"Żory"
]
begin_date = datetime.datetime(year=1949, month=1, day=1, hour=0, minute=0, second=0)
end_date = begin_date + datetime.timedelta(days=365 * 75)
generator = np.random.default_rng(790492283396)
batch = iter(generator.integers(low=-1500, high=3500, size=66*75*365*24*2))
start = datetime.datetime.now()
with open('../data/temperatures.csv', 'w', encoding='utf-8') as target:
for city in cities:
print(city)
now = begin_date
while now < end_date:
target.write("{};{}.000;{}\n".format(city, now, int(next(batch)) / 100.0))
now += datetime.timedelta(minutes=30)
end = datetime.datetime.now()
print("Completed in {}".format(end - start))