Merge pull request 'Release 1.0.0' from dev into master

This commit is contained in:
Piotr Dec 2024-07-19 01:28:59 +02:00
commit d042044e76
25 changed files with 652 additions and 17 deletions

3
.gitignore vendored
View file

@ -3,3 +3,6 @@ target/
### IntelliJ IDEA ###
.idea/
*.iml
# local files
data/

25
pom.xml
View file

@ -12,9 +12,9 @@
</parent>
<groupId>eu.ztsh</groupId>
<artifactId>spring-boot-template</artifactId>
<name>Spring Boot Standalone Template</name>
<version>1.0.0-SNAPSHOT</version>
<artifactId>large-file-reading-challenge</artifactId>
<name>Large File Reading Challenge</name>
<version>1.0.0</version>
<properties>
<!-- encoding -->
@ -30,6 +30,7 @@
<!-- dependencies -->
<!-- plugins -->
<jsonschema2pojo.version>1.2.1</jsonschema2pojo.version>
</properties>
<dependencies>
@ -66,6 +67,24 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.jsonschema2pojo</groupId>
<artifactId>jsonschema2pojo-maven-plugin</artifactId>
<version>${jsonschema2pojo.version}</version>
<configuration>
<sourceDirectory>${basedir}/schema</sourceDirectory>
<targetPackage>eu.ztsh.lfr.model</targetPackage>
<useBigDecimals>true</useBigDecimals>
<includeAdditionalProperties>false</includeAdditionalProperties>
</configuration>
<executions>
<execution>
<goals>
<goal>generate</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

View file

@ -1,7 +1,8 @@
# Large file reading challenge
Welcome in the recruitment challenge.
Write an application that, at the endpoint specified by you, returns the yearly average temperatures for a given city in the format array of objects with the following fields: year, averageTemperature.
Write an application that, at the endpoint specified by you, returns the yearly average temperatures for a given city in
the format array of objects with the following fields: year, averageTemperature.
## Assumptions
@ -10,23 +11,41 @@ Write an application that, at the endpoint specified by you, returns the yearly
- The content of the source file may change during the application's running
## Example source file
[example_file.csv](example_file.csv)
## Response
### Schema
[response_schema](schema/response.json)
### Example
## Example response
```json
[
{
"year": "2021",
"averageTemperature": 12.1
},
{
"year": "2022",
"averageTemperature": 11.1
},
{
"year": "2023",
"averageTemperature": 14.1
}
{
"year": "2021",
"averageTemperature": 12.1
},
{
"year": "2022",
"averageTemperature": 11.1
},
{
"year": "2023",
"averageTemperature": 14.1
}
]
```
## Configuration
| property | description | default |
|---------------|--------------------------------|-----------------------|
| data.file-url | path to file with temperatures | data/temperatures.csv |
## Usage
Temperature statistics endpoint: `/api/temperatures/{city}`
Returns HTTP 200 when city is found, 404 otherwise.

25
schema/response.json Normal file
View file

@ -0,0 +1,25 @@
{
"$id": "https://ztsh.eu/lfr/response.json",
"$schema": "http://json-schema.org/draft/2020-12/schema",
"type": "array",
"def": {
"average": {
"type": "object",
"properties": {
"year": {
"type": "string"
},
"averageTemperature": {
"type": "number"
}
},
"required": [
"year",
"averageTemperature"
]
}
},
"items": {
"$ref": "#/def/average"
}
}

View file

@ -2,8 +2,16 @@ package eu.ztsh.lfr;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableConfigurationProperties
@ConfigurationPropertiesScan
@EnableAsync
@EnableScheduling
public class Main {
public static void main(String[] args) {

View file

@ -0,0 +1,8 @@
package eu.ztsh.lfr.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties("data")
public record DataProperties(String fileUrl) {
}

View file

@ -0,0 +1,13 @@
package eu.ztsh.lfr.core;
import eu.ztsh.lfr.model.Average;
import jakarta.annotation.Nonnull;
import java.util.List;
public interface TemperaturesService {
@Nonnull
List<Average> getTemperaturesFor(String city);
}

View file

@ -0,0 +1,42 @@
package eu.ztsh.lfr.core.impl;
import eu.ztsh.lfr.core.TemperaturesService;
import eu.ztsh.lfr.model.Average;
import eu.ztsh.lfr.model.Averages;
import eu.ztsh.lfr.model.events.FileProcessedEvent;
import jakarta.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
@Service
public class TemperaturesServiceImpl implements TemperaturesService {
private static final Logger log = LoggerFactory.getLogger(TemperaturesServiceImpl.class);
private final AtomicReference<Averages> averages = new AtomicReference<>();
@Nonnull
@Override
public synchronized List<Average> getTemperaturesFor(String city) {
while (averages.get() == null) {
try {
wait();
} catch (InterruptedException e) {
log.error("Thread was interrupted", e);
Thread.currentThread().interrupt();
}
}
return averages.get().getOrDefault(city.toLowerCase(), List.of());
}
@EventListener(FileProcessedEvent.class)
public synchronized void updateTemperatures(FileProcessedEvent event) {
averages.set(event.averages());
notifyAll();
}
}

View file

@ -0,0 +1,100 @@
package eu.ztsh.lfr.core.impl.files;
import eu.ztsh.lfr.config.DataProperties;
import eu.ztsh.lfr.model.Averages;
import eu.ztsh.lfr.model.DataRow;
import eu.ztsh.lfr.model.FileProcessingException;
import eu.ztsh.lfr.model.Temperatures;
import eu.ztsh.lfr.model.events.FileModifiedEvent;
import eu.ztsh.lfr.model.events.FileProcessedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.stream.Stream;
@Service
public class FileLoadingService {
private static final Logger log = LoggerFactory.getLogger(FileLoadingService.class);
private final DataProperties dataProperties;
private final ApplicationEventPublisher eventPublisher;
private final ConcurrentLinkedQueue<ApplicationEvent> queue = new ConcurrentLinkedQueue<>();
private final AtomicBoolean working = new AtomicBoolean();
@Autowired
public FileLoadingService(DataProperties dataProperties, ApplicationEventPublisher eventPublisher) {
this.dataProperties = dataProperties;
this.eventPublisher = eventPublisher;
}
@Async
@EventListener({FileModifiedEvent.class, ApplicationReadyEvent.class})
public void onFileModified(ApplicationEvent event) {
queue.add(event);
}
@Scheduled(fixedRate=500)
void runTask() {
if (working.get()) {
return;
}
ApplicationEvent event;
log.debug("{} events queued", queue.size());
do {
// process only last
event = queue.poll();
} while (!queue.isEmpty());
if (event != null) {
log.info("Processing event: {}", event.getClass().getSimpleName());
working.set(true);
var result = processFile();
eventPublisher.publishEvent(new FileProcessedEvent(result));
working.set(false);
}
}
private Averages processFile() {
var start = System.currentTimeMillis();
log.debug("Processing file");
final Temperatures model = new Temperatures();
try (final Stream<String> lines = Files.lines(Paths.get(dataProperties.fileUrl()))) {
lines.parallel()
.filter(Predicate.not(String::isBlank))
.map(line -> {
try {
var parts = line.split(";");
return new DataRow(parts[0].toLowerCase(),
Integer.parseInt(parts[1].split("-")[0]),
Double.parseDouble(parts[2]));
} catch (Exception e) {
throw new FileProcessingException("Error in line %s".formatted(line), e);
}
})
.forEach(model::addData);
} catch (IOException e) {
throw new FileProcessingException("File error", e);
}
var millis = System.currentTimeMillis() - start;
log.atInfo().setMessage("Elapsed time: {}:{}").addArgument(TimeUnit.MILLISECONDS.toMinutes(millis))
.addArgument(() -> String.format("%02d", TimeUnit.MILLISECONDS.toSeconds(millis) % 60))
.log();
return model.getAverages();
}
}

View file

@ -0,0 +1,77 @@
package eu.ztsh.lfr.core.impl.files;
import eu.ztsh.lfr.config.DataProperties;
import eu.ztsh.lfr.model.events.FileModifiedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
@Service
public class WatcherService {
private static final Logger log = LoggerFactory.getLogger(WatcherService.class);
private final WatchService watchService;
private final ApplicationEventPublisher eventPublisher;
private final String fileName;
@Autowired
public WatcherService(DataProperties dataProperties, ApplicationEventPublisher eventPublisher) throws IOException {
this.eventPublisher = eventPublisher;
this.watchService = FileSystems.getDefault().newWatchService();
var filePath = Paths.get(dataProperties.fileUrl());
fileName = filePath.getFileName().toString();
filePath.getParent().register(
watchService,
// register file creation as some editors modify files in temp, and then replaces original file.
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_MODIFY
);
}
@SuppressWarnings("BusyWait")
@EventListener(ApplicationReadyEvent.class)
public void watch() throws InterruptedException {
WatchKey key;
while ((key = watchService.take()) != null) {
/*
When modifying in IntelliJ, one line edit ends with:
ENTRY_CREATE: temperatures.csv~
ENTRY_MODIFY: temperatures.csv~
ENTRY_MODIFY: temperatures.csv~
ENTRY_MODIFY: temperatures.csv
ENTRY_MODIFY: temperatures.csv
ENTRY_MODIFY: temperatures.csv
Thread.sleep is used to filter out duplicated events.
origin: https://stackoverflow.com/a/25221600
*/
Thread.sleep(50);
var maybeLastEvent = key.pollEvents().stream()
.filter(event -> event.context() != null)
.filter(event -> event.context().toString().endsWith(fileName))
.reduce((o1, o2) -> o2);
if (maybeLastEvent.isPresent()) {
var lastEvent = maybeLastEvent.get();
log.info("Got event of kind:{}", lastEvent.kind());
eventPublisher.publishEvent(new FileModifiedEvent(this));
} else {
log.trace("Got event for not watched file");
}
key.reset();
}
}
}

View file

@ -0,0 +1,13 @@
package eu.ztsh.lfr.model;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class Averages extends HashMap<String, List<Average>> {
public Averages(Map<String, List<Average>> m) {
super(m);
}
}

View file

@ -0,0 +1,5 @@
package eu.ztsh.lfr.model;
public record DataRow(String city, int year, double temperature) {
}

View file

@ -0,0 +1,9 @@
package eu.ztsh.lfr.model;
public class FileProcessingException extends RuntimeException {
public FileProcessingException(String message, Throwable cause) {
super(message, cause);
}
}

View file

@ -0,0 +1,33 @@
package eu.ztsh.lfr.model;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
public class Temperatures extends ConcurrentHashMap<String, ConcurrentMap<Integer, YearDataAccumulator>> {
public void addData(DataRow dataRow) {
putIfAbsent(dataRow.city(), new ConcurrentHashMap<>());
var city = get(dataRow.city());
city.putIfAbsent(dataRow.year(), new YearDataAccumulator());
city.get(dataRow.year()).add(dataRow.temperature());
}
public Averages getAverages() {
var computed = entrySet().stream()
.map(entry -> {
var newEntry = entry.getValue().entrySet().stream()
.map(yearEntry -> {
var average = new Average();
average.setYear(yearEntry.getKey().toString());
average.setAverageTemperature(yearEntry.getValue().getAverage());
return average;
})
.toList();
return Map.entry(entry.getKey(), newEntry);
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
return new Averages(computed);
}
}

View file

@ -0,0 +1,25 @@
package eu.ztsh.lfr.model;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.DoubleAdder;
public class YearDataAccumulator {
private final DoubleAdder sum = new DoubleAdder();
private final AtomicInteger counter = new AtomicInteger();
public void add(double value) {
synchronized (this) {
sum.add(value);
counter.incrementAndGet();
}
}
public BigDecimal getAverage() {
return BigDecimal.valueOf(sum.sum()).setScale(2, RoundingMode.HALF_UP)
.divide(BigDecimal.valueOf(counter.get()), RoundingMode.HALF_UP);
}
}

View file

@ -0,0 +1,11 @@
package eu.ztsh.lfr.model.events;
import org.springframework.context.ApplicationEvent;
public class FileModifiedEvent extends ApplicationEvent {
public FileModifiedEvent(Object source) {
super(source);
}
}

View file

@ -0,0 +1,7 @@
package eu.ztsh.lfr.model.events;
import eu.ztsh.lfr.model.Averages;
public record FileProcessedEvent(Averages averages) {
}

View file

@ -0,0 +1,32 @@
package eu.ztsh.lfr.web;
import eu.ztsh.lfr.core.TemperaturesService;
import eu.ztsh.lfr.model.Average;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController
@RequestMapping("/api/temperatures")
public class TemperaturesController {
private final TemperaturesService temperaturesService;
@Autowired
public TemperaturesController(TemperaturesService temperaturesService) {
this.temperaturesService = temperaturesService;
}
@GetMapping("/{city}")
public ResponseEntity<List<Average>> getTemperatures(@PathVariable String city) {
var data = temperaturesService.getTemperaturesFor(city);
return data.isEmpty() ? ResponseEntity.notFound().build() : ResponseEntity.ok(data);
}
}

View file

@ -0,0 +1,2 @@
data:
file-url: data/temperatures.csv

View file

View file

@ -0,0 +1,74 @@
package eu.ztsh.lfr.core.impl.files;
import eu.ztsh.lfr.config.DataProperties;
import eu.ztsh.lfr.model.FileProcessingException;
import eu.ztsh.lfr.model.events.FileProcessedEvent;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import java.io.IOException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
class FileLoadingServiceTest {
private final ApplicationEventPublisher eventPublisher = Mockito.mock(ApplicationEventPublisher.class);
@Test
void successfulRunTest() {
var captor = ArgumentCaptor.forClass(FileProcessedEvent.class);
var service = createFileLoadingService("success.csv");
service.onFileModified(new TestEvent());
assertThatNoException().isThrownBy(service::runTask);
Mockito.verify(eventPublisher, Mockito.times(1))
.publishEvent(captor.capture());
var averages = captor.getValue().averages();
assertThat(averages).hasSize(1);
var list = averages.get("warszawa");
assertThat(list).hasSize(1);
assertThat(list.getFirst()).satisfies(item -> {
assertThat(item.getYear()).isEqualTo("2018");
// 29.2775, but it is BigDecimal with scale 2 and round half up
assertThat(item.getAverageTemperature().doubleValue()).isEqualTo(29.28);
});
}
@Test
void notExistingFileTest() {
var service = createFileLoadingService("not-existing.csv");
service.onFileModified(new TestEvent());
assertThatThrownBy(service::runTask)
.isInstanceOf(FileProcessingException.class)
.hasRootCauseInstanceOf(IOException.class);
Mockito.verifyNoInteractions(eventPublisher);
}
@Test
void malformedFileTest() {
var service = createFileLoadingService("malformed.csv");
service.onFileModified(new TestEvent());
assertThatThrownBy(service::runTask)
.isInstanceOf(FileProcessingException.class)
.hasMessage("Error in line Warszawa;2018-09-21 11:17:42.31833.27")
.hasRootCauseInstanceOf(ArrayIndexOutOfBoundsException.class);
Mockito.verifyNoInteractions(eventPublisher);
}
private FileLoadingService createFileLoadingService(String path) {
return new FileLoadingService(new DataProperties("src/test/resources/" + path), eventPublisher);
}
private static class TestEvent extends ApplicationEvent {
public TestEvent() {
super("TestEvent");
}
}
}

View file

@ -0,0 +1,4 @@
Warszawa;2018-09-19 05:17:32.619;9.97
Warszawa;2018-09-20 18:44:42.468;39.02
Warszawa;2018-09-21 11:17:42.31833.27
Warszawa;2018-09-22 19:25:07.568;34.85
1 Warszawa;2018-09-19 05:17:32.619;9.97
2 Warszawa;2018-09-20 18:44:42.468;39.02
3 Warszawa;2018-09-21 11:17:42.31833.27
4 Warszawa;2018-09-22 19:25:07.568;34.85

View file

@ -0,0 +1,4 @@
Warszawa;2018-09-19 05:17:32.619;9.97
Warszawa;2018-09-20 18:44:42.468;39.02
Warszawa;2018-09-21 11:17:42.318;33.27
Warszawa;2018-09-22 19:25:07.568;34.85
1 Warszawa 2018-09-19 05:17:32.619 9.97
2 Warszawa 2018-09-20 18:44:42.468 39.02
3 Warszawa 2018-09-21 11:17:42.318 33.27
4 Warszawa 2018-09-22 19:25:07.568 34.85

View file

@ -0,0 +1,2 @@
with open('../data/temperatures.csv', 'a', encoding='utf-8') as target:
target.write("\n")

100
util/test-generator.py Normal file
View file

@ -0,0 +1,100 @@
"""
Create data in city;yyyy-mm-dd HH:mm:ss.SSS;temp format
Avg line length: ~36 bytes
Expected output file size: >3 GB -> 3 000 000 000 bytes
Needed lines count = ~83.(3)m
66 district cities * 75 years with measurement twice per hour = 66*75*365*24*2 = 86 724 000 -> ~3.1 GB
"""
import datetime
import numpy as np
cities = [
"Biała Podlaska",
"Białystok",
"Bielsko-Biała",
"Bydgoszcz",
"Bytom",
"Chełm",
"Chorzów",
"Częstochowa",
"Dąbrowa Górnicza",
"Elbląg",
"Gdańsk",
"Gdynia",
"Gliwice",
"Gorzów Wielkopolski",
"Grudziądz",
"Jastrzębie-Zdrój",
"Jaworzno",
"Jelenia Góra",
"Kalisz",
"Katowice",
"Kielce",
"Konin",
"Koszalin",
"Kraków",
"Krosno",
"Legnica",
"Leszno",
"Lublin",
"Łomża",
"Łódź",
"Mysłowice",
"Nowy Sącz",
"Olsztyn",
"Opole",
"Ostrołęka",
"Piekary Śląskie",
"Piotrków Trybunalski",
"Płock",
"Poznań",
"Przemyśl",
"Radom",
"Ruda Śląska",
"Rybnik",
"Rzeszów",
"Siedlce",
"Siemianowice Śląskie",
"Skierniewice",
"Słupsk",
"Sopot",
"Sosnowiec",
"Suwałki",
"Szczecin",
"Świętochłowice",
"Świnoujście",
"Tarnobrzeg",
"Tarnów",
"Toruń",
"Tychy",
"Wałbrzych",
"Włocławek",
"Wrocław",
"Zabrze",
"Zamość",
"Zielona Góra",
"Żory"
]
begin_date = datetime.datetime(year=1949, month=1, day=1, hour=0, minute=0, second=0)
end_date = begin_date + datetime.timedelta(days=365 * 75)
generator = np.random.default_rng(790492283396)
batch = iter(generator.integers(low=-1500, high=3500, size=66*75*365*24*2))
start = datetime.datetime.now()
with open('../data/temperatures.csv', 'w', encoding='utf-8') as target:
for city in cities:
print(city)
now = begin_date
while now < end_date:
target.write("{};{}.000;{}\n".format(city, now, int(next(batch)) / 100.0))
now += datetime.timedelta(minutes=30)
end = datetime.datetime.now()
print("Completed in {}".format(end - start))