java.util.concurrent에 있는 invokeAll() 함수에 대해서 얘기해보고자 합니다.
ExecutorService에서 제공하는 함수로 callable list를 넘겨서 한꺼번에 수행하는 함수 입니다.
이 함수는 submit()이나 CompletionService와는 사용법이 좀 다르기에 예제로 간단히 설명해 보고자 합니다.
※아래 예제는 kotlin으로 작성 하었습니다.
invokeAll()의 간단한 사용 예제
- Callable list를 받아 전부 끝나야 함수가 종료되는 형태
- Callalbe list를 받고 timeout을 받아, 시간안에 끝나지 않으면 종료하는 형태
/**
* Executes the given tasks, returning a list of Futures holding
* their status and results when all complete.
* {@link Future#isDone} is {@code true} for each
* element of the returned list.
* Note that a completed task could have
* terminated either normally or by throwing an exception.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.
*
* @param tasks the collection of tasks
* @param <T> the type of the values returned from the tasks
* @return a list of Futures representing the tasks, in the same
* sequential order as produced by the iterator for the
* given task list, each of which has completed
* @throws InterruptedException if interrupted while waiting, in
* which case unfinished tasks are cancelled
* @throws NullPointerException if tasks or any of its elements are {@code null}
* @throws RejectedExecutionException if any task cannot be
* scheduled for execution
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
/**
* Executes the given tasks, returning a list of Futures holding
* their status and results
* when all complete or the timeout expires, whichever happens first.
* {@link Future#isDone} is {@code true} for each
* element of the returned list.
* Upon return, tasks that have not completed are cancelled.
* Note that a completed task could have
* terminated either normally or by throwing an exception.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.
*
* @param tasks the collection of tasks
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @param the type of the values returned from the tasks
* @return a list of Futures representing the tasks, in the same
* sequential order as produced by the iterator for the
* given task list. If the operation did not time out,
* each task will have completed. If it did time out, some
* of these tasks will not have completed.
* @throws InterruptedException if interrupted while waiting, in
* which case unfinished tasks are cancelled
* @throws NullPointerException if tasks, any of its elements, or
* unit are {@code null}
* @throws RejectedExecutionException if any task cannot be scheduled
* for execution
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
간략하게 정리해 보면
- future list를 반환하고 전부 끝날때까지 holding된다.
- 넘겨준 Callable list가 정상 처리되든 exception이 발생하든 완료된것으로 본다.
- 동작중에 전달받은 list가 변경되면 결과를 보장하지 않는다
- 넘겨준 list 순서대로 결과 future를 담아서 넘겨준다.
발생할 수 있는 Exception
- InterruptedException : 동작이 종료되지 않은(동작중인) task가 취소 되었을때
- NullPointerException : 함수의 param중 null이 있을때
- RejectedExecutionException : task를 pool에 넣을수 없을때.
사실 exception의 발생부분을 명확하게 알아야 예외 발생시 적절한 처리를 할수 있을것 같네요.
import java.util.concurrent.*
class InvokeAllTest {
fun test() {
val searchExecutor = Executors.newFixedThreadPool(5)
val callableList = listOf(
Callable {
Thread.sleep(100)
1
},
Callable {
Thread.sleep(200)
2
},
Callable {
Thread.sleep(300)
3
},
Callable {
Thread.sleep(400)
4
},
Callable {
Thread.sleep(500)
5
}
)
println("test1")
val futureList = searchExecutor.invokeAll(callableList, 1000, TimeUnit.MILLISECONDS)
searchExecutor.invokeAll(callableList) // 여기서 blocking됨.
println("test2")
futureList.forEachIndexed { index, result ->
println("index: $index canceled: ${result.isCancelled()} isDone:${result.isDone} result: ${result.get()}")
}
}
}
실행 결과.... |
test1 test2 index: 0 canceled: false isDone:true result: 0 index: 1 canceled: false isDone:true result: 1 index: 2 canceled: false isDone:true result: 2 index: 3 canceled: false isDone:true result: 3 index: 4 canceled: false isDone:true result: 4 |
위 코드를 수행하면
test1이 찍힌 이후에 약 500ms 이후에 test2가 찍힙니다.
이는 invokeAll()함수를 호출하는 line에서 blocking이 됨을 나타냅니다.
Future바로 넘겨주는 submit()이나, execute()와는 다릅니다.
즉 가지고 있는 list의 작업이 전부 완료되거나, timeout이 되야 blocking이 풀립니다.
Timeout 설정
import java.util.concurrent.*
fun main(args: Array) {
val searchExecutor = Executors.newFixedThreadPool(2)
val callableList = listOf(
Callable {
Thread.sleep(100)
0
},
Callable {
Thread.sleep(200)
1
},
Callable {
Thread.sleep(300)
2
},
Callable {
Thread.sleep(400)
3
},
Callable {
Thread.sleep(500)
4
}
)
println("test1")
val futureList = searchExecutor.invokeAll(callableList, 250, TimeUnit.MILLISECONDS)
println("test2")
futureList.forEachIndexed { index, result ->
try {
println("index: $index canceled: ${result.isCancelled()} isDone:${result.isDone} result: ${result.get()}")
} catch (e: CancellationException) {
println("CancellationException")
}
}
}
실행결과... |
test1 test2 index: 0 canceled: false isDone:true result: 0 index: 1 canceled: false isDone:true result: 1 CancellationException CancellationException CancellationException |
Task에 문제가 있는 경우
fun main(args: Array) {
val searchExecutor = Executors.newFixedThreadPool(2)
val callableList = listOf(
...
Callable {
val aaa: String? = null
aaa!!.length // 강제로 NPE 발생시킴
Thread.sleep(400)
3
},
...
)
println("test1")
val futureList = searchExecutor.invokeAll(callableList, 1000, TimeUnit.MILLISECONDS)
println("test2")
futureList.forEachIndexed { index, result ->
try {
println("index: $index canceled: ${result.isCancelled()} isDone:${result.isDone} result: ${result.get()}")
} catch (e: CancellationException) {
println("CancellationException")
} catch (e: ExecutionException) {
println("ExecutionException")
}
}
}
실행결과... |
test1 test2 index: 0 canceled: false isDone:true result: 0 index: 1 canceled: false isDone:true result: 1 index: 2 canceled: false isDone:true result: 2 ExecutionException index: 4 canceled: false isDone:true result: 4 |
위 코드에는 catch에 ExecutionException을 추가했습니다.
이때. ExecutionException이 발생한 index3의 canceled 값은 false 입니다.
shutdown()에 대한처리
그럼 중간에 executor를 shutdown 하면 어떻게 될까요?
150ms 후에 강제로 shutdown 하는 코드를 넣어 봅니다.
fun main(args: Array) {
val searchExecutor = Executors.newFixedThreadPool(2)
val callableList = listOf(
...
)
Thread(
{
Thread.sleep(150)
searchExecutor.shutdownNow() //강제 shutdown
}
).start()
println("test1")
val futureList = searchExecutor.invokeAll(callableList, 1000, TimeUnit.MILLISECONDS)
println("test2")
futureList.forEachIndexed { index, result ->
try {
println("index: $index canceled: ${result.isCancelled()} isDone:${result.isDone} result: ${result.get()}")
} catch (e: InterruptedException) {
println("InterruptedException")
} catch (e: CancellationException) {
println("CancellationException")
} catch (e: ExecutionException) {
println("ExecutionException")
}
}
}
이럴경우 완료되지 못한 작업들은 ExecutionException이 발생합니다.
실행결과... |
test1 test2 index: 0 canceled: false isDone:true result: 0 ExecutionException ExecutionException ExecutionException ExecutionException |
이때. shutdown으로 인하여 ExecutionException이 발생한 index1,2,3의 canceled 값은 false 입니다.
정리
여기서 중요한 점은 task의 동작에 문제가 발생하는 경우 invokeAll()에서는 exception이 발생하지 않습니다.
invokeAll()의 결과로 받은 Future를 get()할때 exception이 발생 합니다.
따라서 try - catch는 Future.get()부분을 감싸야 합니다.
또한 CancellationException의 발생을 막기 위해서는 future.isCancelled()를 먼저 호출하여 cancel여부를 먼저 확인한 후에 future.get()를 하는
코드가 좀더 깔끔해 보입니다.
아니면 CancellationException을 try - catch에 추가하여 따로 처리해도 됩니다.
또한 shutdown()이나, 내부 에러로 인한 ExecutionException 발생시 isCancelled()의 값은 false 입니다.
timeout으로 작업이 취소된 경우에만 isCancelled() 값이 true로 반환됩니다.
'개발이야기 > Java' 카테고리의 다른 글
IntelliJ에서 jar 생성 (0) | 2019.09.18 |
---|---|
Java 8 Lambda를 이용한 Decorate pattern (0) | 2017.12.15 |
Java 8 Lambda를 이용한 Builder pattern (0) | 2017.12.14 |
Java 8 Lambda를 이용한 lazy evaluation (0) | 2017.12.13 |
Java 8 Lambda를 이용한 virtual proxy pattern (0) | 2017.12.12 |