본문으로 바로가기

Java concurrent 함수 - invokeAll()

category 개발이야기/Java 2018. 9. 19. 11:15
반응형

java.util.concurrent에 있는 invokeAll() 함수에 대해서 얘기해보고자 합니다.

ExecutorService에서 제공하는 함수로 callable list를 넘겨서 한꺼번에 수행하는 함수 입니다.

이 함수는 submit()이나 CompletionService와는 사용법이 좀 다르기에 예제로 간단히 설명해 보고자 합니다.

※아래 예제는 kotlin으로 작성 하었습니다.


invokeAll()의 간단한 사용 예제

ExecutorService에서는 submit(), execute()이외에도 다양한 함수를 제공합니다.
그중에서 테스트해 볼 함수는 invokeAll()이며 이 또한 두가지 형태가 존재합니다.

  • Callable list를 받아 전부 끝나야 함수가 종료되는 형태
  • Callalbe list를 받고 timeout을 받아, 시간안에 끝나지 않으면 종료하는 형태

    /**
     * Executes the given tasks, returning a list of Futures holding
     * their status and results when all complete.
     * {@link Future#isDone} is {@code true} for each
     * element of the returned list.
     * Note that a completed task could have
     * terminated either normally or by throwing an exception.
     * The results of this method are undefined if the given
     * collection is modified while this operation is in progress.
     *
     * @param tasks the collection of tasks
     * @param  <T> the type of the values returned from the tasks
     * @return a list of Futures representing the tasks, in the same
     *         sequential order as produced by the iterator for the
     *         given task list, each of which has completed
     * @throws InterruptedException if interrupted while waiting, in
     *         which case unfinished tasks are cancelled
     * @throws NullPointerException if tasks or any of its elements are {@code null}
     * @throws RejectedExecutionException if any task cannot be
     *         scheduled for execution
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;


/**
     * Executes the given tasks, returning a list of Futures holding
     * their status and results
     * when all complete or the timeout expires, whichever happens first.
     * {@link Future#isDone} is {@code true} for each
     * element of the returned list.
     * Upon return, tasks that have not completed are cancelled.
     * Note that a completed task could have
     * terminated either normally or by throwing an exception.
     * The results of this method are undefined if the given
     * collection is modified while this operation is in progress.
     *
     * @param tasks the collection of tasks
     * @param timeout the maximum time to wait
     * @param unit the time unit of the timeout argument
     * @param the type of the values returned from the tasks
     * @return a list of Futures representing the tasks, in the same
     *         sequential order as produced by the iterator for the
     *         given task list. If the operation did not time out,
     *         each task will have completed. If it did time out, some
     *         of these tasks will not have completed.
     * @throws InterruptedException if interrupted while waiting, in
     *         which case unfinished tasks are cancelled
     * @throws NullPointerException if tasks, any of its elements, or
     *         unit are {@code null}
     * @throws RejectedExecutionException if any task cannot be scheduled
     *         for execution
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;


간략하게 정리해 보면

- future list를 반환하고 전부 끝날때까지 holding된다.

- 넘겨준 Callable list가 정상 처리되든 exception이 발생하든 완료된것으로 본다.

- 동작중에 전달받은 list가 변경되면 결과를 보장하지 않는다

- 넘겨준 list 순서대로 결과 future를 담아서 넘겨준다.


발생할 수 있는 Exception

- InterruptedException : 동작이 종료되지 않은(동작중인) task가 취소 되었을때

- NullPointerException : 함수의 param중 null이 있을때

- RejectedExecutionException : task를 pool에 넣을수 없을때.


사실 exception의 발생부분을 명확하게 알아야 예외 발생시 적절한 처리를 할수 있을것 같네요.

 

import java.util.concurrent.*

class InvokeAllTest {
	fun test() {
		val searchExecutor = Executors.newFixedThreadPool(5)

		val callableList = listOf(
				Callable {
					Thread.sleep(100)
					1
				},
				Callable {
					Thread.sleep(200)
					2
				},
				Callable {
					Thread.sleep(300)
					3
				},
				Callable {
					Thread.sleep(400)
					4
				},
				Callable {
					Thread.sleep(500)
					5
				}
		)	

		println("test1")

		val futureList = searchExecutor.invokeAll(callableList, 1000, TimeUnit.MILLISECONDS)
		searchExecutor.invokeAll(callableList) // 여기서 blocking됨.
		println("test2")

		futureList.forEachIndexed { index, result ->
			println("index: $index canceled: ${result.isCancelled()} isDone:${result.isDone} result: ${result.get()}")			
		}
	}
}

실행 결과.... 

test1

test2

index: 0 canceled: false isDone:true result: 0

index: 1 canceled: false isDone:true result: 1

index: 2 canceled: false isDone:true result: 2

index: 3 canceled: false isDone:true result: 3

index: 4 canceled: false isDone:true result: 4


위 코드를 수행하면

test1이 찍힌 이후에 약 500ms 이후에 test2가 찍힙니다.

이는 invokeAll()함수를 호출하는 line에서 blocking이 됨을 나타냅니다.

Future바로 넘겨주는 submit()이나, execute()와는 다릅니다.


즉 가지고 있는 list의 작업이 전부 완료되거나, timeout이 되야 blocking이 풀립니다.


Timeout 설정

두번째로는 invokeAll의 timeout을 250ms으로 바꾸고 실행해 봅니다.
250ms 안에 수행되는 작업은 모두 완료되어 표기되나 250ms 넘는 작업들은 CancellationException이 발생합니다.

import java.util.concurrent.*

fun main(args: Array) {
	val searchExecutor = Executors.newFixedThreadPool(2)

	val callableList = listOf(
			Callable {
				Thread.sleep(100)
				0
			},
			Callable {
				Thread.sleep(200)
				1
			},
			Callable {
				Thread.sleep(300)
				2
			},
			Callable {			
				Thread.sleep(400)
				3
			},
			Callable {
				Thread.sleep(500)
				4
			}
	)

	println("test1")

	val futureList = searchExecutor.invokeAll(callableList, 250, TimeUnit.MILLISECONDS)
	println("test2")

	futureList.forEachIndexed { index, result ->
		try {
			println("index: $index canceled: ${result.isCancelled()} isDone:${result.isDone} result: ${result.get()}")
		} catch (e: CancellationException) {
			println("CancellationException")
		}
	}
}

 실행결과...

test1

test2

index: 0 canceled: false isDone:true result: 0

index: 1 canceled: false isDone:true result: 1

CancellationException

CancellationException

CancellationException


Task에 문제가 있는 경우

두번째로는 Callable task 내부에 일부러 NullPointerException을 발생하도록 만들었습니다.
이럴경우 ExecutionException이 발생합니다.
fun main(args: Array) {
	val searchExecutor = Executors.newFixedThreadPool(2)

	val callableList = listOf(
			...
			Callable {			
				val aaa: String? = null
				aaa!!.length // 강제로 NPE 발생시킴
				Thread.sleep(400)
				3
			},
			...
	)

	println("test1")

	val futureList = searchExecutor.invokeAll(callableList, 1000, TimeUnit.MILLISECONDS)
	println("test2")

	futureList.forEachIndexed { index, result ->
		try {
			println("index: $index canceled: ${result.isCancelled()} isDone:${result.isDone} result: ${result.get()}")
		} catch (e: CancellationException) {
			println("CancellationException")
		} catch (e: ExecutionException) {
			println("ExecutionException")
		}
	}
}

 실행결과...

test1

test2

index: 0 canceled: false isDone:true result: 0

index: 1 canceled: false isDone:true result: 1

index: 2 canceled: false isDone:true result: 2

ExecutionException

index: 4 canceled: false isDone:true result: 4

위 코드에는 catch에 ExecutionException을 추가했습니다.

이때. ExecutionException이 발생한 index3의 canceled 값은 false 입니다.


shutdown()에 대한처리

그럼 중간에 executor를 shutdown 하면 어떻게 될까요?

150ms 후에 강제로 shutdown 하는 코드를 넣어 봅니다.

fun main(args: Array) {
	val searchExecutor = Executors.newFixedThreadPool(2)

	val callableList = listOf(
			...
	)
        
        Thread(
			{
				Thread.sleep(150)
				searchExecutor.shutdownNow() //강제 shutdown
			}
	).start()

	println("test1")

	val futureList = searchExecutor.invokeAll(callableList, 1000, TimeUnit.MILLISECONDS)
	println("test2")

	futureList.forEachIndexed { index, result ->
		try {
			println("index: $index canceled: ${result.isCancelled()} isDone:${result.isDone} result: ${result.get()}")
                } catch (e: InterruptedException) {
			println("InterruptedException")
		} catch (e: CancellationException) {
			println("CancellationException")
		} catch (e: ExecutionException) {
			println("ExecutionException")
		}
	}
}

이럴경우 완료되지 못한 작업들은 ExecutionException이 발생합니다.

실행결과... 

test1

test2

index: 0 canceled: false isDone:true result: 0

ExecutionException

ExecutionException

ExecutionException

ExecutionException

이때. shutdown으로 인하여 ExecutionException이 발생한 index1,2,3의 canceled 값은 false 입니다.


정리

여기서 중요한 점은 task의 동작에 문제가 발생하는 경우 invokeAll()에서는 exception이 발생하지 않습니다.

invokeAll()의 결과로 받은 Future를 get()할때 exception이 발생 합니다.

따라서 try - catch는 Future.get()부분을 감싸야 합니다.

또한 CancellationException의 발생을 막기 위해서는 future.isCancelled()를 먼저 호출하여 cancel여부를 먼저 확인한 후에 future.get()를 하는 

코드가 좀더 깔끔해 보입니다.

아니면 CancellationException을 try - catch에 추가하여 따로 처리해도 됩니다.

또한 shutdown()이나, 내부 에러로 인한 ExecutionException 발생시 isCancelled()의 값은 false 입니다.

timeout으로 작업이 취소된 경우에만 isCancelled() 값이 true로 반환됩니다.



반응형