多嗜睡理发师问题 (kotlin 多线程 / coroutine)
理发师问题
一个理发师, 一把理发椅, n 把等候理发的顾客椅子, 如果没有顾客则理发师便在理发椅上睡觉 , 当有一个顾客到达时, 首先看理发师在干什么, 如果理发师在睡觉, 则唤醒理发师理发, 如果理发师正在理发, 则查看是否有空的顾客椅子可坐, 如果有, 坐下等待, 如果没有, 则离开.
简化成如下步骤:
理发师等待顾客
顾客尝试进入等待区 (人满则离开)
(step0) 理发师招待顾客
(step1) 顾客响应
(step2) 理发师理发到完成
(step3) 顾客离开
(step4) 理发师结束, 继续 step0 招待新顾客
要求:
每个顾客一个线程, 每个理发师一个线程
每个步骤都打印
确保 2-7 是顺序执行的
1 个理发师, 多个顾客
思考下该怎么实现呢?
单理发师多线程版
为了保证 2-7 的顺序执行, 我们只需要让 3-7 每个步骤一个信号量, 前一个步骤结束时释放下一个步骤的信号量.
kotlin 例子如下:abstractclassSleepingBarberSimulator(val seats:Int){
companionobject{
constval BARBER_PREP_SEC=3
constval BARBER_WORK_SEC=8
constval BARBER_REST_SEC=3
constval BARBER_WORK_VAR=2
}
privateval rand=Random()
abstractfun newBarber()
abstractfun newCustomer()
abstractfun joinAllCustomers()
protectedfun logBarber(msg:String)=println("=".repeat(100)+"Barber"+msg)
protectedfun barberWorkSpeed()=BARBER_WORK_SEC-BARBER_WORK_VAR+rand.nextInt(BARBER_WORK_VAR*2)
}
classMultiThreadSleepingBarberSimulator(seats:Int):SleepingBarberSimulator(seats){
privateval bid=AtomInt()
privateval cid=AtomInt()
privateval steps=(0until10).map{Semaph(0)}
privateval freeSeats=AtomInt(seats)
privateval repliedCustomers=ConcurrentDeque()
// val repliedCustomers = ConcurrentLinkedDeque()
privateval barberCustomerSemas=Hashtable>()
val customerThreads=LinkedList()
overridefun newBarber(){
val id=bid.inc()
val workSpeed=barberWorkSpeed()
Thread{
logBarber("$id comes and make some prepare")
sleep(BARBER_PREP_SEC)
while(true){
steps[0].acquire()
logBarber("$id asking for a customer")
steps[1].release()
steps[2].acquire()
val customer=repliedCustomers.poll()
logBarber("$id working on customer $customer")
sleep(workSpeed)
logBarber("$id done on customer $customer")
steps[3].release()
steps[4].acquire()
logBarber("$id has a rest")
sleep(BARBER_REST_SEC)
barberCustomerSemas.remove(customer)
}
}.apply{start()}
}
overridefun newCustomer(){
val id=cid.inc()
customerThreads.add(Thread{
println("---Customer $id comes")
if(freeSeats.dec()<0){
println("!!!!!!!!!!!!Customer $id has no seat and leaves")
freeSeats.inc()
return@Thread
}
println("---Customer $id sits and waits")
steps[0].release()
steps[1].acquire()
println("---Customer $id responds, stands up and is served")
freeSeats.inc()
repliedCustomers.add(id)
barberCustomerSemas[id]=Semaph(0)toSemaph(0)
steps[2].release()
steps[3].release()
println("---Customer $id done and leaves")
steps[4].release()
}.apply{start()})
}
overridefun joinAllCustomers()=customerThreads.forEach{it.join()}
privatefun sleep(nSecond:Int)=Thread.sleep(nSecond*1000L)
}
多个理发师
区别在于我们此时需要对理发师和顾客进行配对, 也意味着是从 step2 开始不能再完全共享信号量, 而是按照配对情况来共享.privateval barberCustomerSemas=Hashtable>()
overridefun newBarber(){
val id=bid.inc()
val workSpeed=barberWorkSpeed()
Thread{
logBarber("$id comes and make some prepare")
sleep(BARBER_PREP_SEC)
while(true){
steps[0].acquire()
logBarber("$id asking for a customer")
steps[1].release()
steps[2].acquire()
val customer=repliedCustomers.poll()
logBarber("$id working on customer $customer")
sleep(workSpeed)
logBarber("$id done on customer $customer")
val(cs,bs)=barberCustomerSemas[customer]!!
cs.release()
bs.acquire()
logBarber("$id has a rest")
sleep(BARBER_REST_SEC)
barberCustomerSemas.remove(customer)
}
}.apply{start()}
}
overridefun newCustomer(){
val id=cid.inc()
customerThreads.add(Thread{
println("---Customer $id comes")
if(freeSeats.dec()<0){
println("!!!!!!!!!!!!Customer $id has no seat and leaves")
freeSeats.inc()
return@Thread
}
println("---Customer $id sits and waits")
steps[0].release()
steps[1].acquire()
println("---Customer $id responds, stands up and is served")
freeSeats.inc()
repliedCustomers.add(id)
barberCustomerSemas[id]=Semaph(0)toSemaph(0)
steps[2].release()
val(cs,bs)=barberCustomerSemas[id]!!
cs.acquire()
println("---Customer $id done and leaves")
bs.release()
}.apply{start()})
}
单线程 coroutine 版
(单线程)coroutine 让实现变得更简单, 因为在单线程下共享状态, 不需要担心 CPU 缓存不一致, 不需要加锁来保证原子性.
每个 customer/barber 一个 coroutine,coroutine 间的同步我们可以用 channel 来完成.
这里只需要每个 customer/barber 一个 ReceiveChannel(相当于一个 actor, 可以用 unbuffered(bufferSize=1) Channel 来使得 send 和 receive 都等待),
同步时往配对的 channel 里 send/receive.classCoroutineSleepingBarberSimulator(seats:Int):SleepingBarberSimulator(seats){// by single thread coroutine
varbid=0
varcid=0
val scope=CoroutineScope(newSingleThreadContext("sleep barbers"))
val customerJobs=LinkedList()
val seatedCustomers=Chan()
val customerChans=mutableMapOf>()
val barberChans=mutableMapOf>()
overridefun newBarber(){
val id=++bid
val workSpeed=barberWorkSpeed()
scope.launch{
logBarber("$id comes and make some prepare")
sleep(BARBER_PREP_SEC)
while(true){
val customer=seatedCustomers.receive()
val customerChan=customerChans[customer]!!
val chan=Chan(1)
barberChans[id]=chan
logBarber("$id asking for a customer")
customerChan.send(id)
chan.receive()
logBarber("$id working on customer $customer")
sleep(workSpeed)
logBarber("$id done on customer $customer")
customerChan.send(0)
chan.receive()
logBarber("$id has a rest")
sleep(BARBER_REST_SEC)
barberChans.remove(id)
}
}
}
overridefun newCustomer(){
val id=++cid
customerJobs.add(scope.launch{
println("---Customer $id comes")
if(seatedCustomers.size()>=seats){
println("!!!!!!!!!!!!Customer $id has no seat and leaves")
return@launch
}
println("---Customer $id sits and waits")
val chan=Chan(1)
customerChans[id]=chan
seatedCustomers.send(id)
val barber=chan.receive()
val barberChan=barberChans[barber]!!
println("---Customer $id responds, stands up and is served by barber $barber")
barberChan.send(0)
chan.receive()
println("---Customer $id done and leaves")
customerChans.remove(id)
barberChan.send(0)
})
}
overridefun joinAllCustomers()=runBlocking{customerJobs.forEach{it.join()}}
privatesuspend fun sleep(nSecond:Int)=delay(nSecond*1000L)
}
附加问题: 打印 sleep/awake
对于 step0: 要让理发师在没有顾客时打印 "sleep", 并且如果 sleep 那么唤醒时需要额外打印 "awake".
那么该如何实现?References
https://en.wikipedia.org/wiki/Sleeping_barber_problem
完整代码参见: https://github.com/davidhuangdw/kotlin.concurrent/blob/master/src/main/kotlin/examples/sleeping_barbers.kt
来源: http://www.jianshu.com/p/5ef4613e4317
与本文相关文章