fix: Threading & optimizations
This commit is contained in:
parent
6d71245d9f
commit
f5ec500343
4 changed files with 142 additions and 142 deletions
|
@ -7,12 +7,11 @@ import android.bluetooth.BluetoothSocket
|
|||
import android.util.Log
|
||||
import com.mapbox.navigation.tripdata.maneuver.model.Maneuver
|
||||
import eu.ztsh.garmin.data.DataCache
|
||||
import eu.ztsh.garmin.data.GarminManeuver
|
||||
import eu.ztsh.garmin.data.GarminMapper
|
||||
import eu.ztsh.garmin.data.GarminModelItem
|
||||
import eu.ztsh.garmin.data.MapboxMapper
|
||||
import java.io.IOException
|
||||
import java.util.*
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.SynchronousQueue
|
||||
|
||||
@SuppressLint("MissingPermission")
|
||||
|
@ -23,88 +22,98 @@ class Garmin(
|
|||
) {
|
||||
|
||||
private lateinit var connection: ConnectThread
|
||||
private val observer = ProcessingThreadObserver()
|
||||
private lateinit var maneuvers: ManeuverProcessingThread
|
||||
private val cache = DataCache()
|
||||
|
||||
private val maneuversPool = Executors.newFixedThreadPool(4)
|
||||
|
||||
fun start() {
|
||||
connection = ConnectThread()
|
||||
connection.start()
|
||||
|
||||
maneuvers = ManeuverProcessingThread()
|
||||
maneuvers.start()
|
||||
}
|
||||
|
||||
fun close() {
|
||||
connection.close()
|
||||
|
||||
maneuvers.interrupt()
|
||||
maneuvers.join(0)
|
||||
|
||||
maneuversPool.shutdown()
|
||||
}
|
||||
|
||||
fun process(maneuver: Maneuver) {
|
||||
ManeuverProcessingThread(maneuver).start()
|
||||
maneuversPool.submit{maneuvers.enqueue(maneuver)}
|
||||
}
|
||||
|
||||
// fun process(location: LocationMatcherResult) {
|
||||
// LocationProcessingThread(location).start()
|
||||
// }
|
||||
private inner class ManeuverProcessingThread : ProcessingThread<Maneuver>() {
|
||||
|
||||
// fun process(navigationSessionState: NavigationSessionState) {
|
||||
// cache.update(navigationSessionState)
|
||||
// }
|
||||
|
||||
private inner class ManeuverProcessingThread(val maneuver: Maneuver) : ProcessingThread<GarminManeuver>() {
|
||||
|
||||
override fun process(): GarminManeuver? {
|
||||
if (cache.hasChanged(maneuver)) {
|
||||
cache.update(maneuver)
|
||||
return MapboxMapper.map(maneuver)
|
||||
override fun mapAndSend(maybeItem: Maneuver?): Maneuver? {
|
||||
if (maybeItem != null) {
|
||||
Log.d(TAG, "mapAndSend (${currentThread().name}): got new")
|
||||
var changed = false
|
||||
if (cache.hasChanged(maybeItem.laneGuidance)) {
|
||||
changed = true
|
||||
Log.d(TAG, "mapAndSend: lanes")
|
||||
send(GarminMapper.map(MapboxMapper.asLanes(maybeItem)))
|
||||
}
|
||||
if (cache.hasChanged(maybeItem.stepDistance)) {
|
||||
changed = true
|
||||
Log.d(TAG, "mapAndSend: stepDistance")
|
||||
send(GarminMapper.map(MapboxMapper.asDistance(maybeItem)))
|
||||
}
|
||||
if (cache.hasChanged(maybeItem.primary)) {
|
||||
changed = true
|
||||
Log.d(TAG, "mapAndSend: primary")
|
||||
send(GarminMapper.map(MapboxMapper.asDirection(maybeItem)))
|
||||
}
|
||||
if (changed) {
|
||||
return maybeItem
|
||||
}
|
||||
}
|
||||
return null
|
||||
}
|
||||
|
||||
override fun enqueue(item: GarminManeuver) {
|
||||
if (cache.hasChanged(item.lanes)) {
|
||||
connection.enqueue(GarminMapper.map(item.lanes))
|
||||
}
|
||||
if (cache.hasChanged(item.direction)) {
|
||||
connection.enqueue(GarminMapper.map(item.direction))
|
||||
}
|
||||
if (cache.hasChanged(item.distance)) {
|
||||
connection.enqueue(GarminMapper.map(item.distance))
|
||||
}
|
||||
// flag?
|
||||
override fun updateCache(item: Maneuver) {
|
||||
cache.update(item)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// private inner class LocationProcessingThread(val location: LocationMatcherResult) : ProcessingThread() {
|
||||
//
|
||||
// override fun process() {
|
||||
// if (cache.hasChanged(location)) {
|
||||
// cache.update(location)
|
||||
// send(MapboxMapper.apply(location))
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
private abstract inner class ProcessingThread<T> : Thread() {
|
||||
|
||||
private abstract inner class ProcessingThread<T : GarminModelItem> : Thread() {
|
||||
private val queue: SynchronousQueue<T> = SynchronousQueue()
|
||||
private var stop: Boolean = false
|
||||
|
||||
abstract fun process(): T?
|
||||
abstract fun mapAndSend(maybeItem: T?): T?
|
||||
|
||||
abstract fun enqueue(item: T)
|
||||
abstract fun updateCache(item: T)
|
||||
|
||||
fun send(data: IntArray) {
|
||||
connection.enqueue(data)
|
||||
}
|
||||
|
||||
fun enqueue(item: T) {
|
||||
queue.put(item)
|
||||
}
|
||||
|
||||
override fun run() {
|
||||
val processing = process()
|
||||
if (processing != null) {
|
||||
enqueue(processing)
|
||||
cache.update(processing)
|
||||
while (!stop) {
|
||||
val maybeItem = queue.poll()
|
||||
val item = mapAndSend(maybeItem)
|
||||
if (item != null) {
|
||||
Log.d(TAG, "run: Cache updated")
|
||||
updateCache(item)
|
||||
}
|
||||
}
|
||||
observer.join(this)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private inner class ProcessingThreadObserver {
|
||||
|
||||
fun <T : GarminModelItem> join(thread: ProcessingThread<T>) {
|
||||
thread.join()
|
||||
override fun interrupt() {
|
||||
stop = true
|
||||
super.interrupt()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private inner class ConnectThread : Thread() {
|
||||
|
@ -126,9 +135,11 @@ class Garmin(
|
|||
context.setConnectionStatus(true)
|
||||
sleep(3000)
|
||||
readAll()
|
||||
send(intArrayOf(0x07, 0x01)) // Set GPS to true
|
||||
while (true) {
|
||||
val newCurrent = queue.poll()
|
||||
if (newCurrent == null) {
|
||||
Log.d(TAG, "run (${currentThread().name}): Sleep...")
|
||||
sleep(500)
|
||||
} else {
|
||||
current = newCurrent
|
||||
|
@ -177,7 +188,6 @@ class Garmin(
|
|||
private fun sendRaw(buff: IntArray) {
|
||||
buff.forEach { socket!!.outputStream.write(it) }
|
||||
socket!!.outputStream.flush()
|
||||
sleep(2000)
|
||||
readAll()
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue