Tensorpack과 Multigpu를 활용한 빠른 트레이닝 코드 작성하기

tips
tensorflow

(Curtis Kim) #1

딥러닝 모델을 개발하고, 트레이닝을 시작하고, 한참 후 트레이닝 결과를 확인하고 다시 수정하고, 이 과정을 반복하면서 원하는 모델을 만들기까지는 시간이 오래걸립니다. 이것이 문제인 이유를 설명해보면 아래와 같습니다.

모델 개발 시간보다 트레이닝 시간이 오래 걸린다.

트레이닝을 시작해두고, 다른 일을 할 수도 있겠지만, 대부분의 사람에게도 Context Switching 비용이 발생하죠. 생각보다 집중해서 여러 일을 할 수 있지 않기도 하거니와, 트레이닝이 진행되고 있는 동안 GPU 등의 Resource를 점유하고 있기 때문에, 다른 일을 하지 못할 수도 있습니다.

모델의 성능 내지는 잘못 개발된 부분에 대한 피드백이 늦어진다.

결국 이것도 개발 사이클에 대한 이야기입니다. 일주일, 이주일 트레이닝을 돌리고 나서 성능이 나오지 않으면 다시 수정된 모델로 비슷한 시간을 써야합니다. 혹시 잘못 개발된 부분이 있었다고 해도 마찬가지입니다. 개발의 한 사이클이 길어지게 되므로 전체적인 개발 사이클 역시도 증가하게 됩니다.

생각보다 GPU 등의 Resource를 최대한으로 활용하는 코드를 작성하지 못한다.

주변에서, 혹은 Github에서 공개된 코드를 봐도 Resource를 최대한으로 활용하는 코드는 아닙니다. 대부분 GPU 자원이 최대로 활용되지 않습니다. 다르게 이야기하면, 온전히 트레이닝에 자원이 쏟아지는 것이 아니라, 코드의 구조적, 설계적인 결함 때문에 유휴 시간이 발생한다는 것입니다. 예를 들어 데이터 전처리를 하는 동안 GPU 자원이 기다리고 있는다거나 하는 등의 문제입니다. 또, 비용을 투자해 좋은 인프라를 구축한 것에 대비해 큰 효용을 못거두는 경우가 있을 수 있습니다. 예를 들어 gpu 8개를 꼳은 성능 좋은 머신을 구매했는데, gpu 1개를 사용할 때 대비하여 8배의 연산 성능을 거두지 못할 수 있는 것이죠.


최근에 제가 작성하고 있는 https://github.com/ildoonet/tf-openpose 에서 Realtime Multi-Person 2D Pose Estimation using Part Affinity Fields 라는 논문을 구현하고 있습니다. 그 과정에서 위의 문제를 그대로 경험하게 되었습니다.

전처리 과정이 매우 복잡하다.

COCO Pose Dataset에서 제공된 데이터 자체는 복잡하지 않지만, 논문에서는 PAF(Part Affinity Field)와 Heatmap을 계산하여 모델을 트레이닝하기 때문에 데이터로부터 전처리 해야하는 과정이 매우 복잡하게 설계되어 있었습니다. 특히 Image Data를 Augmentation하는 부분까지도 고려해야하기 때문에 연산양은 더욱 증가하게 됩니다. 1 Thread를 활용하여 하나의 이미지를 전처리하는데에는 대략 300~400ms 수준의 시간이 필요했고, 제가 사용 중인 48Core 머신에서 초당 약 100장의 이미지 밖에 생성하지 못한다는 문제가 있었습니다. 실제 트레이닝에 필요한 자원을 고려한다면, 위 수치는 더욱 줄어들 거구요.

image

48 Core, 8 GPU 의 좋은 자원 구비

200기가의 메모리, 48 Core 급의 CPU, 24GB 메모리가 탑재된 8 GPU의 좋은 머신을 가진 상황입니다. OpenPose의 원래 Training Repository : https://github.com/ZheC/Realtime_Multi-Person_Pose_Estimation에서는 대략 2주 정도의 트레이닝 시간을 거쳤다고 하는데, 이는 Batchsize 10으로 GPU 2대를 사용했다고 합니다. 제가 Batchsize가 10으로 작은 이유를 물었더니 '8GB GPU’를 사용했기 때문에 모델이 올라갈 수 있는 한계가 Batchsize 10 이었다고 답변을 얻었습니다.

이런 상황에서 24GB 급의 GPU를 8개 가지고 있는 것은 몇가지 장점이 있을 수 있습니다.

  • Batchsize를 기존보다 훨씬 늘릴 수 있다. 동시에 빠른 속도로 처리.
  • 8개의 GPU를 사용해 더 빠르게 많은 양을 처리.

위와 같은 2가지 상황을 고려해 아래와 같이 트레이닝 코드를 Tensorflow로 재작성하기로 했습니다.

  • 클러스터링 전처리 : CPU자원이 많은 다른 서버를 추가해서, 데이터 전처리를 도와준다.
  • Multi-GPU Trainining : 8대의 GPU를 사용해 빠르게 트레이닝한다.

즉, 아래와 같은 모습으로 트레이닝 클러스터를 개발하게 됩니다.

Tensorpack, Mesos를 이용한 클러스터링 전처리

Tensorpack은 잘 알려진 Tensorflow Wrapper 입니다. 다만 저는 그 코드 내에서 데이터 Pipeline만 이용했습니다. Tensorpack을 데이터 처리 파이프라인으로 사용한 이유는 아래와 같습니다.

  1. Python Multiprocessing의 Fork 과정을 간단하면서도 아주 효과적으로 구현

Python3 부터는 내가 필요한 메모리만 효율적으로 Fork해 multiprocessing으로 여러 Core를 활용하도록 구현할 수 있습니다. Tensorpack은 데이터 전처리에 필요하다고 판단되는 요소만 적절히 fork하여 여러 cpu 자원을 사용할 수 있도록 도와줍니다.

  1. ZeroMQ를 이용한 Queue 지원

ZeroMQ를 지원하기 때문에, 처리된 데이터를 ZeroMQ에 Queue로 쌓을 수 있습니다. 이 덕분에 여러 노드에서 처리된 내용을 마스터 노드의 Queue에 받아둘 수 있습니다.

전처리 데이터 파이프라인 구현

우선 LMDB에 쌓여있는 Coco Dataset을 읽어서, 원하는 형태로 전처리하는 코드를 구현해야 합니다. 크게 보면 아래의 과정을 거칩니다.

  1. LMDB에서 데이터를 읽음

  2. Image Augmentation(Flip, Crop, Noise, …)

  3. 논문에서 소개된 Part Affinity Field, Heatmap을 계산

  4. Batch로 만듦

위 내용은 아무래도 논문의 내용과 섞여 있으므로, 간단히만 소개를 하도록 하겠습니다. LMDB로부터 데이터를 읽는다거나 Part Affinity Field, HeatMap등을 계산하는 부분은 미리 코드로 작성되어 있다고 하면, 아래와 같이 Tensorpack을 이용해서 Data Pipeline을 정의해줄 수 있습니다.

def get_dataflow(path, is_train):
    ds = CocoPoseLMDB(path, is_train)    # Read data from lmdb
    if is_train:
        ds = MapDataComponent(ds, pose_rotation)
        ds = MapDataComponent(ds, pose_flip)
        ds = MapDataComponent(ds, pose_resize_shortestedge_random)
        ds = MapDataComponent(ds, pose_crop_random)
        ds = MapData(ds, pose_to_img)
        augs = [
            imgaug.RandomApplyAug(imgaug.RandomChooseAug([
                imgaug.SaltPepperNoise(white_prob=0.01, black_prob=0.01),
                imgaug.RandomOrderAug([
                    imgaug.BrightnessScale((0.8, 1.2), clip=False),
                    imgaug.Contrast((0.8, 1.2), clip=False),
                    # imgaug.Saturation(0.4, rgb=True),
                ]),
            ]), 0.7),
        ]
        ds = AugmentImageComponent(ds, augs)
    else:
        ds = MapDataComponent(ds, pose_resize_shortestedge_fixed)
        ds = MapDataComponent(ds, pose_crop_center)
        ds = MapData(ds, pose_to_img)

    ds = PrefetchData(ds, 1000, multiprocessing.cpu_count())

    return ds

코드를 보시면, CocoPoseLMDB 라고 제가 직접 구현한 코드에서 데이터를 읽도록 되어 있고 이는 Tensorpack의 RNGDataFlow라는 것을 상속받아 구현했습니다.

얻어진 데이터를 Training의 경우에는 Rotation, Flip, Resize, Crop 등의 전처리를 거친 후에 augs 에 정의된 대로 이미지에 노이즈를 넣거나, 밝기 조절 등을 하는 Augmentation을 하도록 되어 있습니다. Training Set이 아닌 Validation Set 에서는 랜덤의 데이터 어그먼테이션 없이 Resize와 Center Crop처리만 하도록 되어 있습니다.

그렇게 해서 얻어진 Data는 다시 PrefetchData(ds, 1000, multiprocessing.cpu_count()) 를 통하도록 되어 있습니다. PrefetchData는 위에서 정의된 데이터 파이프라인을 미리 가져올 수 있도록 정의된 모듈인데, 저는 최대 1000개까지 미리 생성하고, 이는 제가 가진 core수만큼 multiprocessing을 통해 처리하도록 argument를 주었습니다.

중요한 점은, 데이터를 읽는 부분이나 rotation, flip, crop 등의 augmentation을 정의하고 이를 PrefetchData에 넘기면 필요한 부분을 여러 프로세스로 띄워서 처리해준다는 점입니다.

마찬가지로 Batchsize만큼의 데이터를 묶어주는 부분도 아래와 같이 정의했습니다.

https://github.com/ildoonet/tf-openpose/blob/master/pose_dataset.py#L318

def get_dataflow_batch(path, is_train, batchsize):
    ds = get_dataflow(path, is_train)
    ds = BatchData(ds, batchsize)
    ds = PrefetchData(ds, 10, 2)

    return ds

즉 1개씩 있던 데이터를 묶어 원하는 배치사이즈로 만들어주는데, 이 때도 역시 PrefetchData를 이용해 2개의 Process를 사용하도록 했습니다. 이렇게 하면 저의 경우 총 2*48 = 96개의 프로세스가 생성되어 제가 정의한 파이프라인대로 데이터를 준비해주게 됩니다.

이렇게 해주면 Python의 GIL에 대한 걱정을 조금 덜어낸 상태로 빠른 속도로 배치를 만들어내게 됩니다. 저의 경우 아래와 같은 성능향상이 있었습니다.

Mode 전처리된 이미지 100개 생성 시간 개선
일반적인 Python Code 201s 1x
Tensorpack Version with 48 core 5s 40x

ZeroMQ를 사용한 전처리 클러스터링 구축

저는 여기에 조금 더 보태어서, 위의 처리를 다른 머신에서 처리하고, GPU 로 트레이닝하는 Master 노드로 전달하도록 아래와 같이 ZeroMQ 세팅을 추가했습니다.

df = get_dataflow_batch(args.datapath, args.train, args.batchsize)
send_dataflow_zmq(df, args.master)

이렇게 처리하면 args.master에 주어진 주소(eg. tcp://192.168.0.2:1258)의 머신에 ZeroMQ로 전처리된 내용을 보내주게 됩니다.

Tensorflow Queue로 전환

Tensorpack에서 나온 데이터는 numpy 배열이니까, tensorflow에 빠르게 Feeding해주기 위해서는 Queue를 하나 구현해야 합니다. 해당 코드는 아래에 있습니다.

https://github.com/ildoonet/tf-openpose/blob/master/pose_dataset.py#L326

Tensorflow에서 제공하는 Queue에 Feeding해주는 형태로 Thread를 구현하면, GPU를 통해 Training되는 등의 시간에 구현된 Thread의 내용이 동작하면서 GPU가 유휴 자원이 되는 것을 막을 수 있습니다.

Dockerization & Mesos

저는 이렇게 구축된 전처리 코드를 Docker를 이용해 Mesos의 Marathon으로 배포했습니다.

이렇게 하면 매우 좋은 부분은 트레이닝 과정에서 얼마든지 전처리기를 늘리거나 줄일 수 있다는 점입니다. 자원 사용량을 봤더니 GPU가 좀 놀더라 싶으면 마라톤으로 'Scale Application’버튼을 누르고 전처리를 돕는 노드의 수를 얼마든지 늘려주면 됩니다. 데이터 Queue를 봤더니, Queue가 넘치더라, 하면 노드의 수를 줄이면 되구요.

image


Multi GPU 코드

트레이닝할 머신에서는 아래의 1줄이면 여러 노드에서 처리된 배치 데이터를 받아볼 수 있습니다.

df = RemoteDataZMQ(args.listen)

이제 전처리된 데이터는 넉넉하게 Queue에 쌓여 있을 테니, 해당 데이터를 가져와서 여러 GPU로 빠르게 처리하는 부분을 잘 구현하면 됩니다. 텐서플로우에서는 Cifar10 데이터에 대해서 MultiGPU를 활용하는 예제를 함께 주고 있습니다.

그런데, 위 코드는 몇가지 불만족스러운 부분이 있습니다. 그 중 가장 큰 문제는

각 GPU별로 Loss를 따로 계산하고, Gradient를 Average하는 코드를 직접 구현했다.

는 부분 일 것 같습니다. 덕분에 코드 가독성도 떨어지고, 작성해주어야하는 부분도 많아진 것이죠.

하지만 Tensorflow Document를 살펴보니, 더 간단한 방법으로 MultiGPU를 활용할 수 있도록 코드를 작성할 수 있었습니다. 크게 보면 아래와 같은 구조로 작성했습니다.

  1. Weight가 동일한 네트워크를 각 GPU에 할당한다.

  2. Tensorpack에서 얻은 Batch Image를 GPU개수 만큼 나눈다.

  3. 각 GPU에서 Forward/Backward를 할당된 인풋에 대해 처리하도록 한다.

네트워크 생성과 Optimizer 생성에 대한 코드가 아래에 있습니다.

    # define model for multi-gpu
    q_inp_split = tf.split(q_inp, args.gpus)
    output_vectmap = []
    output_heatmap = []
    vectmap_losses = []
    heatmap_losses = []
    for gpu_id in range(args.gpus):
        with tf.device(tf.DeviceSpec(device_type="GPU", device_index=gpu_id)):
            with tf.variable_scope(tf.get_variable_scope(), reuse=(gpu_id > 0)):
                if args.model == 'mobilenet_1.0':
                    net = MobilenetNetwork({'image': q_inp_split[gpu_id]}, conv_width=1.0)
                    pretrain_path = './models/pretrained/mobilenet_v1_1.0_224_2017_06_14/mobilenet_v1_1.0_224.ckpt'
                elif args.model == 'mobilenet_0.75':
                    net = MobilenetNetwork({'image': q_inp_split[gpu_id]}, conv_width=0.75)
                    pretrain_path = './models/pretrained/mobilenet_v1_0.75_224_2017_06_14/mobilenet_v1_0.75_224.ckpt'
                elif args.model == 'mobilenet_0.50':
                    net = MobilenetNetwork({'image': q_inp_split[gpu_id]}, conv_width=0.50)
                    pretrain_path = './models/pretrained/mobilenet_v1_0.50_224_2017_06_14/mobilenet_v1_0.50_224.ckpt'
                elif args.model == 'cmu':
                    net = CmuNetwork({'image': q_inp_split[gpu_id]})
                    pretrain_path = './models/numpy/openpose_coco.npy'
                else:
                    raise Exception('Invalid Mode.')
                vect, heat = net.loss_last()
                output_vectmap.append(vect)
                output_heatmap.append(heat)

                l1s, l2s = net.loss_l1_l2()

                for idx, (l1, l2) in enumerate(zip(l1s, l2s)):
                    if gpu_id == 0:
                        vectmap_losses.append([])
                        heatmap_losses.append([])
                    vectmap_losses[idx].append(l1)
                    heatmap_losses[idx].append(l2)

    # define loss
    losses = []
    for l1_idx, l1 in enumerate(vectmap_losses):
        l1_concat = tf.concat(l1, axis=0)
        loss = tf.nn.l2_loss(l1_concat - q_vect, name='loss_l1_stage%d' % l1_idx)
        losses.append(loss)
    for l2_idx, l2 in enumerate(heatmap_losses):
        l2_concat = tf.concat(l2, axis=0)
        loss = tf.nn.l2_loss(l2_concat - q_heat, name='loss_l2_stage%d' % l2_idx)
        losses.append(loss)

    output_vectmap = tf.concat(output_vectmap, axis=0)
    output_heatmap = tf.concat(output_heatmap, axis=0)
    total_loss = tf.reduce_mean(losses)
    total_ll_loss = tf.reduce_mean([
        tf.nn.l2_loss(output_vectmap - q_vect),
        tf.nn.l2_loss(output_heatmap - q_heat)
    ])

    # define optimizer
    global_step = tf.Variable(0, trainable=False)
    starter_learning_rate = args.lr
    momentum = 0.9
    max_epoch = 50
    learning_rate = tf.train.exponential_decay(starter_learning_rate, global_step,
                                               decay_steps=10000, decay_rate=0.90, staircase=True)
    optimizer = tf.train.RMSPropOptimizer(learning_rate, decay=0.0005, momentum=0.9, epsilon=1e-10)
    train_op = optimizer.minimize(total_loss, global_step, colocate_gradients_with_ops=True)

주요 부분만 살펴보면,

    q_inp_split = tf.split(q_inp, args.gpus)

    ...

    for gpu_id in range(args.gpus):
        with tf.device(tf.DeviceSpec(device_type="GPU", device_index=gpu_id)):
            with tf.variable_scope(tf.get_variable_scope(), reuse=(gpu_id > 0)):
                net = MobilenetNetwork({'image': q_inp_split[gpu_id]}, conv_width=1.0)

위는 Batch를 gpu개수 만큼 쪼개어 넣고 쪼개진 부분으로 각 GPU에 할당된 네트워크에 Feeding하도록 한 부분입니다.

    output_vectmap = tf.concat(output_vectmap, axis=0)
    output_heatmap = tf.concat(output_heatmap, axis=0)
    total_loss = tf.reduce_mean(losses)

여러 gpu에서 나온 output을 tf.concat 을 이용해 하나로 묶고, 이를 이용하여 Loss를 한번에 계산합니다.

    train_op = optimizer.minimize(total_loss, global_step, colocate_gradients_with_ops=True)

그리고 옵티마이저를 정의할 때 colocate_gradients_with_ops=True 옵션을 주어 Forward가 일어났던 GPU에서 Gradient 계산도 일어나도록 처리합니다. 이 부분을 처리하지 않으면 Backward가 특정 gpu에 몰려서 일어나는 등의 이슈가 있을 수 있고 이는 성능향상에 매우 큰 걸림돌이 됩니다.


실제 성능 확인을 해보니 1 gpu, 싱글머신으로 전처리 하는 경우에 있었던 아래와 같은 문제들이 사라졌습니다.

  • 간혹 전처리 된 데이터가 빠르게 피딩되지 않아, GPU가 놀기도 함
  • CPU Intensive하다보니 트레이닝에도 영향을 미침
  • 느림!! 평균적으로 1초에 10개의 이미지를 처리하는 수준.
  • 예상되는 트레이닝 시간 대략 2주!!!

8 GPU에서 트레이닝하는 마스터와 인풋 배치를 생성해주는 여러 노드를 추가함으로써 아래와 같은 성능 향상을 얻었습니다.

  • Master 노드의 CPU가 놀고 있지만 GPU는 100%로 처리됨
  • 빠름!!! 평균적으로 1초에 140개의 이미지를 처리하는 수준.
  • 실제 트레이닝 시간이 하루가 되지 않음!!!

요약

요약하면 아래의 Tip을 드릴 수 있을 것 같습니다.

  • Tensorpack은 매우 만족스러운 데이터 파이프라인 추상화를 제공한다.

    • 필요한 부분을 여러 프로세스를 처리하여 큐에 쌓을 수 있음
    • ZeroMQ를 지원하므로 여러 노드를 활용해 데이터전처리를 처리할 수 있음
  • Tensorflow Optimizer의 colocate_gradients_with_ops 옵션을 활용해 MultiGPU를 최대한 활용하도록 설계

    • 인풋 파이프라인은 1개로 유지하되, 인풋 배치를 GPU 수 만큼 잘라 여러 GPU에서 서로 다른 내용을 처리하도록 함
    • GPU 1개 코드에 비해 코드 변화량이 크지 않게 작성할 수 있음.

Reference


NCCL을 이용한 Efficient한 Tensorflow MultiGPU Training 코드 작성하기
(Curtis Kim) #2

퍼포먼스 튜닝하기 전에 체크포인트가 잘 정리되어 있는 글입니다.

http://tensorpack.readthedocs.io/en/latest/tutorial/performance-tuning.html