nykergoto’s blog

機械学習とpythonをメインに

データコンペサイトを作る DjangoRestFramework編

この記事は atma Advent Calendar 2019 - Qiita 2019/12/21 の記事です。

今年自社のサービスとして オンサイトのデータコンペティション atmaCup をはじめました。 オンサイトデータコンペとは実際に会場に集まり、準備されたデータをテーマに沿って分析・予測を行い、その精度を競うイベントです。 データコンペで有名なのはKaggleですが、みんなで実際に集まり、かつ時間もその日の8時間と短いのが特徴で、 参加者のスキルがオンラインのデータコンペより強く結果に表れます。

このatmaCupですが当然やろうと思うとコンペ用のシステムも必要です。というわけで裏側のシステム 「ぐるぐる」 を僕が作っています。 この時記事ではそのバックエンド部分を担っている DjangoRestFramework についてその便利さとどういう機能を使ってぐるぐるを作っているか、を少し紹介したいと思います。

f:id:dette:20191222153119p:plain

おことわり

この記事ではコードを一部書いていますがプロジェクト全体の構成などは記述していません。ごめんなさい(力尽きました)
DjangoRestFrameworkでやってみたい!と思った方は Django REST Frameworkを使って爆速でAPIを実装する - Qiita こちらの記事などを参考に是非チャレンジしてみて下さい。損はしないです。

どういう構成か

先ずぐるぐるの全体のざっくりとした構成から紹介します。

インフラ

ぐるぐるではAWSを使っていて ECS (Fargate) によってコンテナとしてデプロイしています。 更新は gitlab の master merge のタイミングで gitlab-CI によって ECR に push & 新しいイメージを使って ECS を更新という CI を組んでいます。

Static File はすべて CloudFront 経由で S3 につなげていて、データベースは private subnet 上の RDS に接続というよくある構成です。

開発環境

docker / docker-compose でアプリケーションごとにイメージを作っています。

アプリケーション

フロントエンドとバックエンドを切り離したRESTFULLな構成です。特にコンペはスコア計算時にかなり重たい処理が入りますので、フロントと分離するのは自然かなと思っています。 フロントエンドはNuxt.jsでバックエンドはDjangoRestFrameworkを採用しています。

DjangoRestFramework

DjangoRestFramework (略して DRFということもあります) は python の web framework の djangorest api 用に拡張したライブラリです。 python で書かれていることもあり、機械学習でいつも python を使っている僕にとってはかなり馴染みやすいフレームワークでした。

個人的に押しの子なのですが Qiita でも余り人気がなかったり日本でははやっていないようで少し残念です…少しでもユーザーを増やしたいな、という思いもあってこれを書いていたりします。

DjangoRestFramework の立ち位置

DjangoRestFramework は python の web framework の中ではかなりカバー範囲の広いフレームワークです。カバー範囲が広いのでこれだけですべて完結できる便利さがある一方その分重たいという欠点はあります。 python の web framework には他にも flask や falcon などより軽量な物があり、機械学習用途ではこちらが使われていることが多い印象です。

とはいえそれを上回る実装の速度感が魅力で、この記事ではその一部でもお伝えできればなと思っています。

DRF のいいところ

  • DBのことを気にしなくても良い設計
  • Model に関連した機能が豊富
  • Adminsite がほとんどなにもしなくても出来る
  • Document ページがマジで何もしなくても出来る

の4点にあると思っています。

DB のことを気にしなくていい

DRFを使っている時、DBとのやり取りを気にすることはほぼありません。たとえばですが各コンペ用のテーブルが作りたいな、と思ったとします。 Submission に必要そうな情報というと

  • コンペのタイトル
  • 説明文
  • 締切日

など要りそうですね。この場合であればモデルとして以下を定義して

from django.db import models


class Competition(models.Model):
    title = models.CharField(max_length=128, help_text='foo')
    description = models.TextField(max_length=10000, help_text='説明文')
    finished_at = models.DateTimeField(null=True, blank=True,
                                       auto_created=True, help_text='コンペ終了の日時')

migration をするだけで簡単に table 作成をやってくれます。

# migration ファイルの作成
[django@700e14ec6a91 django]$ python manage.py makemigrations competition
Migrations for 'competition':
  diary/competition/migrations/0001_initial.py
    - Create model Competition
# DBへの反映
[django@700e14ec6a91 django]$ python manage.py migrate
Operations to perform:
  Apply all migrations: admin, auth, competition, contenttypes, sessions, v1
Running migrations:
  Applying competition.0001_initial... OK    

あとは makemigration 時に出来たファイルを git に乗せて、本番用サーバーで python manage.py migrate だけ動くようにしておけば model で定義した最新の状態にまで DB の状態を変えてくれるのです。楽ちんですね。 なおこの migration はかなり賢いのでたとえば field の名前を間違えたので直したい、とか新しく field を定義したい、みたいなことぐらいであれば自動的に検知して DB の column 名を変更するような SQL を発行してくれます。(今回は特に時間がなかったので基本的に何も考えずにとにかくmigrationしているので competition だけで40ぐらいファイルができていますが不具合は一度もありませんでした)。

Model に関連した機能が豊富

上記の Model と migration などは Django の機能なんですが (import も django からなのがわかると思います), ここから API のエンドポイント作成までが DRF が担う部分です。 このとき Model に関連した view を作るとき DRF の爆速感が本領を発揮します。

まずDRFでは大きく2つの概念があります。

  • serializer
  • View
    • Url をどの処理に渡すのかを定義する所

この serializer がやっている処理は地道で面倒ですが必要です、というのもユーザーは良からぬリクエストをしてくる可能性がありますから int にしたいときは int へのキャスト処理を書いて〜とやる必要があります。 これを自分で書いていると案外面倒です。

もっというとこの変換はデータベース(すなわち Django の Model) に関連したものが多いです。 たとえばチーム作成をする時を考えてみてください。作成用のエンドポイントでは、チームのmodelに関連した情報をユーザーは送ってくるので、チームのmodelで定義したフィールドのそれぞれに対して json-> python object への変換を書くことになるはずです。 要するに Model の定義と Serialzier の変換は対応関係にある場合が多いのですね。

そこで DRF だと Model の定義に合わせて自動的に変換する便利クラス ModelSerializer があります。これを使うと先の Competition は

from rest_framework import serializers

from .models import Competition


class CompetitionSerializer(serializers.ModelSerializer):
    class Meta:
        model = Competition
        fields = '__all__'

みたくかけます。これだけで Model に定義された _all__ field, ようするに title, description, finished_at の3つのフィールドそれぞれをいい感じに変換してくれます。 (場合によっては全部必要ない場合もありますのでその場合は exclude などで除外フィールドを書くことも可能です)

あとは view を作るだけですが、こちらもモデルから作成してくれる ModelViewSet を指定して先の serializer と model objects を登録した viewset を作って

from rest_framework.viewsets import ModelViewSet

from .models import Competition
from .serializers import CompetitionSerializer


class CompetitionViewSet(ModelViewSet):
    serializer_class = CompetitionSerializer
    queryset = Competition.objects.all()

これを urls に加えます.

from rest_framework.routers import DefaultRouter

from . import views

router = DefaultRouter()

router.register('competitions', views.CompetitionViewSet, basename='competitions')

urlpatterns = [
    *router.urls
]

基本的には, これだけで competition に対する CRUD のエンドポイントが出来てしまいます。簡単!

あとは承認周り(どんなユーザーがエンドポイントにアクセスできるか)やValidation(どういう状態のリクエストは許可するか)なども DRF 側である程度指定されていてるので、それに沿って書くことで何がどこにあるのかよくわからない状態を防げますので、複数人開発や規模が大きいプロジェクトの場合にも強いのかなと思っています。

管理者用の画面

データベースに直接見に行くのでもいいですがwebから見れる管理者側の画面があるとなにか変なことがあったときやノンエンジニアの人に頼んだりする時とても便利です。 今回のプロジェクトは時間は無いのでフロントを作らずすべて Django Admin で実装しています。

実装と言ってもめっちゃ簡単で、最低 admin site に表示するだけなら下記の4行あれば作れてしまいます。 これで competition の CRUD は admin 側から行えるようになります。簡単。

from django.contrib import admin

from . import models

@admin.register(models.Competition)
class CompetitionAdmin(admin.ModelAdmin):
    pass

これだとたとえば「いま終わっているコンペだけ表示したい!」とかの要望に答えられないのでちょっと手を入れて実際の master branch は以下のようになっています。 手を入れると言っても表示する項目書いているだけなんですけどね…

@admin.register(models.Competition)
class CompetitionAdmin(admin.ModelAdmin):
    list_display = (
        'id', 'published', 'private_rank_is_confirmed',
        'title', 'finished_at', 'applying_teams', 'total_submissions', 'is_finished',
        'calculate_status')
    ordering = ('-finished_at',)
    search_fields = ('description',)

f:id:dette:20191222144625p:plain
実際の adminsite のキャプチャ

Documentページ

実際にこのAPIをデプロイしたとしましょう。そうすると次はフロントエンドとの繋ぎこみをする必要があります。 この時結構厄介なのがAPIの仕様をFront作業者にどう伝えるのか、という問題です。

よくある方法とその問題点

一般には仕様書的なものを書いて、それを共有したりするんだと思いますがそれだとどうしても仕様と実装の二重管理になるため

  1. 仕様と同じようにAPIが作られているかがわからない(APIがまちがっている)
  2. そもそも仕様が古くて今のAPIとずれている(仕様書がまちがっている)
  3. 今はまだそのエンドポイントができていない(未着手だった)

といった理由で上手くAPIを使えないことがあります。

DRF のドキュメント生成機能

DRFは今の実装状態から document を自動生成して、それもエンドポイントに付け加える機能があります。 要するに今の実装で使えるエンドポイント一覧をいいかんじに表示するアプリケーションとしても機能するということです。これは見てもらったほうが早いと思うので実際の表示画面をお見せしたいと思います。 このドキュメント機能を使う場合は urls に一行追加すれば OK です。簡単ですね。

from django.contrib import admin
from django.urls import path, include
from rest_framework.documentation import include_docs_urls

urlpatterns = [
    path('admin/', admin.site.urls),
    # 中略
    path('docs', include_docs_urls(title='documentation')) # これが document 用の url
]

この状態で /docs にアクセスすると実装されている API 一覧が見れます。

f:id:dette:20191222143529p:plain
こういうのが出てくる。左下からログインするとそのユーザーでログインした時の挙動も再現できる。

この場からエンドポイントを叩くことも可能です。たとえば nyk510 が join しているチーム一覧などは以下のような感じ。

f:id:dette:20191222143903p:plain
engaged_in=nyk510

またちっちゃいですが左下にログインをするタブがあり、そこから特定のユーザーとしてログインした状態でエンドポイントを叩くことも可能です。 これがあるので、最近は Postman みたいな API 叩くようクライアントを使う機会がとても減りました。

また表示されるのは「実装されている」エンドポイントであることも重要です。これによって 常に実装を正としてAPIを参照することが可能です。 ドキュメントを作っている時に比べて、フロントエンドとのコミュニケーションがかなりスムーズに行なるようになってかなりいいなーと思っています。

作った機能

というわけで便利なDRFで色々と作りました。 基本的な機能は Kaggle を参考にチームマージ機能やディスカッション、それに紐づくコメントといいね、通知などを実装しています。

そのなかでもポイントを挙げると以下のような感じでしょうか。

  • Publicは即座に・PrivateScoreはコンペが終わってから
  • Privateスコア計算をadminsiteから実行できるように
  • discussionをnotebookから作成
  • 通知機能
  • LB周りのテスト

Publicは即座に・PrivateScoreはコンペが終わってから

これは完全に僕のこだわりなのですが、なんとなく主催者だけが最終スコアがわかっているのって卑怯な感じが無いですか? という理由からコンペが終わるまで submission の private score は計算されないような仕組みになっています。 (そのせいでコンペ終了後のスコア計算が上手く回るかどうか毎回めちゃくちゃドキドキしています。これとても心臓に悪いので、この仕様は将来的には変えるかも知れません…)

反対に PublicScore はできるだけ素早く返せるよう、あえて他のインスタンスで計算する構成にしていません。これは正直コンペ開催中ぐらいであれば ECS のクラスタ数を増やすなどで対応できるだろうという読みと、なにより僕のユーザー体験としてスコアがなかなか計算されない辛さを体感しているのが大きいです。実際見ていても数百万行の AUC とかならまだしも 1万行程度の RMSE ならそんなに計算コストもかからなさそうなので、当分はこの方針で行きたいと思っています。

Privateスコア計算を adminsite から実行できるように

PrivateScoreは上記に書いたように計算冴えていませんので、どこかのタイミングで計算を実行させる必要があります。 はじめは terminal からやるようにしていたのですが atmaCup#2 から admin site からボタンひとつで実行できるようにしました。 といってもやることはかなり簡単で、 adminsite 用のテンプレートをちょっと編集して

{% extends 'admin/change_form.html' %}

{% block submit_buttons_bottom %}
{{ block.super }}
<h2>Jobs</h2>
<div class="">
    <div>
        <input type="submit" value="Run Calculate Score" name="_calculate_score">
        <div style="color: #7b7b7b; padding: 8px">コンペに提出されたサブミットすべてに対して, private/public のスコア計算を実行します。<br>
        [NOTE] コンペが終了していない時実行できません.</div>
    </div>
</div>
{% endblock %}

あとはこのファイルを admin site の change_form_template に加えるだけです。

@admin.register(models.Competition)
class CompetitionAdmin(admin.ModelAdmin):
    # 中略
    change_form_template = 'admin/competition/change_form.html'

    def response_change(self, request, obj):
        if '_calculate_score' not in request.POST:
            return super(CompetitionAdmin, self).response_change(request, obj)

        if not obj.is_finished:
            self.message_user(request, message='コンペが終了していません. 計算を行えるのはコンペ終了後です. ', level='warning')
            return HttpResponseRedirect('.')

        self.message_user(request, message=f'{obj.title} の submission に対するスコア計算を開始しました. (合計{obj.total_submissions}件)')

        t = threading.Thread(target=run_score_job, args=(obj,))
        t.setDaemon(True)
        t.start()

        return self._response_post_save(request, obj)

f:id:dette:20191222144054p:plain

これでボタンが押されたタイミングで admin class に指定された method (今の場合は _calculate_score) が動くようになります。 self.message_user を使うとオシャレに popup でエラーなどのメッセージも出せてとても便利でした。

discussion を notebookから作成

これは途中で discussion 作った後に「notebookから直接作れたら便利じゃない?」という声をうけて作ったものです。 めんどくさそうですが nbconvert を使うと案外楽です(画像以外は)。 今回の要件として最終的な format が markdown だったので MarkdownExporter を使っていますが他のフォーマットでも同様に作れると思います。

若干面倒なのは markdown 生成後の画像ファイルがもとのままだとテキストとして代入されているところでしょうか。 今回のDRFではstaticfileは別途保存するようにしていたので ContentFile で画像をobject化して別のモデル UploadFile を使って s3 などに upload するという作業を入れています。

from django.core.files.base import ContentFile
from nbconvert.exporters import MarkdownExporter
from rest_framework.decorators import api_view, parser_classes, permission_classes
from rest_framework.parsers import FormParser, MultiPartParser
from rest_framework.permissions import IsAuthenticated
from rest_framework.views import Response, status

from main.discussions.models import UploadFile


@api_view(['POST'])
@parser_classes([MultiPartParser, FormParser])
@permission_classes([IsAuthenticated])
def notebook_to_other_format(request):
    """
    Jupyter Notebook 形式のファイルから markwon を作成します。
    このエンドポイントにはログインしているユーザーでないとアクセスすることはできません。

    ### Parameter

    * `file`: 変換したい File Object です. 必ず拡張子は `.ipynb` である必要があります
    """
    try:
        file_obj = request.data.get('file', None)

        if file_obj is None:
            raise ValueError('file is required')
        name, ext = file_obj.name.split('.')

        if ext != 'ipynb':
            raise ValueError('File Extension Must Be `ipynb`')

        txt, metadata = MarkdownExporter().from_file(file_obj)
        outputs = metadata['outputs']
        metadata['filename'] = name

        for key, img in outputs.items():
            upload_file = UploadFile(name=key)
            content = ContentFile(img)
            upload_file.file.save(key, content)
            upload_file.save()
            txt = txt.replace(key, upload_file.file.url)
        return Response({
            'body': txt,
            'metadata': metadata
        })
    except Exception as e:
        return Response(data=dict(error=str(e)), status=status.HTTP_400_BAD_REQUEST)

通知機能

自分が参加しているコンペに新しい discussion ができたら通知してほしいなーって思ったのでつけた機能です。 主に django-notification の機能を使っています。基本的に通知をしたいイベントをトリガにして notify.send を呼び出すだけでOKです。

以下の場合だと Discussion (ユーザーがいろんな議論をするディスカッションを管理するためのモデル) が作成された時に、コンペに参加しているユーザー全員に対して通知を送るようにしています。 複数ユーザーへの対応も notification 側でやってくれるのでかなり便利です。

from notifications.signals import notify

@receiver(post_save, sender=Discussion)
def notify_discussion_create_handler(sender, instance: Discussion, created, **kwargs):
    """
    ディスカッションが作成されたことを通知する handler
    """
    if not created or not instance.competition:
        return

    # ディスカッション作成者以外で, コンペに参加しているユーザーに対して通知する
    target_users = User.objects.filter(Q(teams_as_owner__competition=instance.competition)
                                       | Q(teams_as_member__competition=instance.competition)
                                       ).exclude(pk=instance.created_by.pk).distinct()
    notify.send(instance.created_by,
                recipient=target_users,
                verb='create',
                description=f'新しい discussion {instance.short_title} が追加されました',
                target=instance)

あとは notification の model に対してアクセスする endpoint と既読や削除をするような endpoint を作ればOKです。

from notifications.models import Notification
from rest_framework.decorators import action
from rest_framework.pagination import PageNumberPagination
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.schemas import ManualSchema
from rest_framework.viewsets import ReadOnlyModelViewSet

from .serializers import NotificationSerializer

class NotificationViewSet(ReadOnlyModelViewSet):
    serializer_class = NotificationSerializer
    queryset = Notification.objects.order_by('-timestamp') \
        .select_related('actor_content_type',
                        'target_content_type',
                        'action_object_content_type').all()
    permission_classes = (IsAuthenticated,)

    def get_queryset(self):
        return self.queryset.filter(recipient=self.request.user)

    @action(methods=['PUT'], detail=False, url_path='mark-all-as-read',
            schema=ManualSchema(fields=[], description='ユーザーに紐づくすべての通知を既読に更新します'))
    def all_as_read(self, *args, **kwargs):
        qs = self.get_queryset()
        not_read = qs.filter(unread=True).count()
        qs.mark_all_as_read()
        return Response(data={'count': not_read})

    @action(methods=['PUT'], detail=False, url_path='mark-all-as-delete',
            schema=ManualSchema(fields=[], description='ユーザーに紐づくすべての通知を既読に更新します'))
    def all_as_delete(self, *args, **kwargs):
        qs = self.get_queryset()
        n_active = qs.filter(deleted=False).count()
        qs.mark_all_as_deleted()
        return Response(data={'count': n_active})

フロントとつなげるとこんな感じに通知を出すことが出来ます。ちゃんと object に「どこから通知がきたのか」が保存されていますのでフロント側ですこし処理を入れればいいねされたディスカッションのページに移動することが出来ます。

f:id:dette:20191222145805p:plain

これ以外にも以下のようなイベントで通知を行うようにしています。

  • 自分が作ったディスカッションにコメントがついた時
  • 自分が作ったコメントにコメントがついた or いいねがついた時
  • チームマージのリクエストが来た時

これで少しでも見逃しに気づいてくれることがふえたらいいなーと思っています。(コンペやってる時は時間に余裕がないので、あえて煩いぐらいに出そうと思っています)

LB周りのテスト

ちょっとめんどくさいのが submission や Leader Board が絡むロジック部分のテストです。 たとえば RMSE でスコアリングするようなコンペがあって、このコンペでの順位計算がちゃんとできていることをテストしたい! と思ったとします。

この時やらないといけないのはざっとあげても以下の要件があります

  • private/public のスコア計算がちゃんと出来るか
  • private/public のランキングが正しいか
    • 小さい順にちゃんとなっているか
    • 1回もsubmitしていない人(private scoreがNullの人)が順位がつかないようになっているか
  • private スコアの計算対象が選択した submit になっているか
  • 同じスコアの人が居た場合 submit が早い人が順位が高くなるか
  • team merge した時に submit のひも付けもちゃんと merge してその時の public score も過去最高のものが選ばれるか
    • マージしたのにスコアがマージされないと困る
  • LateSubmission がランキングに影響しないか
    • あとで submission した値で更新されると困る

そして順序などは RMSE のように小さければ良いものと AUC のように大きければ良い物の2つがあるため、これらは別々のテストケースとしてテストする必要があります。 これを実現するために

  • 特定の metric で特定の public/private スコアになるような submission を作成する Fixture クラスを作成
    • そのクラスに対して metric の値通りに計算ができているかどうかのテストケースを個別に作成
  • 上記の Fixture クラスを使ってランキング計算等のエンドポイントのテストを作成

という2段階のテストをすることにしました。正直これをやっている時が一番大変だったかもしれません、がミスっていると一番困るところでもあるので致し方なし…

それ以外にも

  • 自分の submission 以外にはアクセスできないこと
  • マージリクエストを許可できるのはオーナー権限だけ
  • privateLB はコンペ終了かつ管理者がOKしていないとアクセスできないこと

みたいな基本的な権限周りのテストも入れています。

使っているライブラリ

主に pytestparameterized というテスト時にパラメータを指定できるライブラリを使って実現しています。 また late submission のように時間がからむテストでは未来の状態を作る必要があります(たとえば今日の18:00終了のコンペに対して明日の16:00にアクセスしたらOKみたいなこと)ので、時間をそのタイミングだけ変更することが出来るライブラリ freezegun を使っています。

# freezegan を使っているテストの一例. with freeze_time のあたり.
    def test_not_allow_change_selected_submit_after_competition_is_finished(self):
        """コンペ終了後に submit file の選択を変えられないこと"""

        # 10回 submit する
        submit_ids = []
        for _ in range(10):
            response = self._submit_to_my_team()
            sub_id = response.json()['id']
            submit_ids.append(sub_id)

        # 一番最後の submit を選択できる
        res = self.client.patch(self.url_from_pk(submit_ids[-1]), data=dict(selected=True))
        self.assertEqual(res.status_code, 200)
        self.assertIs(res.json()['selected'], True)

        # コンペ終了のちょっと前 (1時間前) なら選択を変えられる
        with freeze_time(self.competition.finished_at - timedelta(hours=1)):
            res = self.client.patch(self.url_from_pk(submit_ids[-2]), data=dict(selected=True))
            self.assertEqual(res.status_code, 200)
            self.assertIs(res.json()['selected'], True)

        # コンペ終了後には選択 submit を変えられない
        with freeze_time(self.competition.finished_at + timedelta(hours=2)):
            res = self.client.patch(self.url_from_pk(submit_ids[-3]), data=dict(selected=True))
            self.assertEqual(res.status_code, 400, res.json())

まとめ

早足気味ですがぐるぐるの裏側で使っているDRFの良い所と、実際に実装したときのお気持ちやポイント的なものを書いてみました。 データコンペサイトは普通の web-application に比べて考えることが結構あるので大変ですが、頭を使うのでとても楽しかったなというのが振り返ってみての印象です。やはりいつも使っているサービスを実装するぞ、っていう意気込みもあるのかも知れないですね。

これを見て DRF 面白そう!使ってみようかなー!と思っていただければ幸いです。:D