Giter VIP home page Giter VIP logo

til's People

Contributors

yeomko22 avatar

Stargazers

 avatar

Watchers

 avatar  avatar

til's Issues

kafka mirror maker 2

kafka mirror maker2

  • 서로 다른 두 개의 카프카 클러스터 간에 토픽을 복제하는 애플리케이션이다.
  • 직접 프로듀서와 컨슈머를 사용해서 미러링을 구현하는 애플리케이션을 개발하면 되지만, 미러메이커를 사용하는 이유는 토픽의 모든 것을 복제할 필요성이 있기 때문이다.
  • 특히 동일 파티션에 동일 레코드가 들어가도록 하는 작업은 파티셔너에 대한 정보 없이는 불가하다.
  • 또한 복제하는 토픽의 파티션 개수가 달라지면 복제된 데이터를 저장하는 토픽의 파티션 개수도 달라져야 한다. 이런 기능을 지원하는 것이 카프카 미러메이커이다.
  • 미러메이커2는 단방향, 양방향 복제, ACL 복제, 새 토픽 자동 감지 등의 기능을 지원한다. 이는 클러스터를 2개 이상 지원할 때 빛이 난다.

algo expert

Client Sever Architecture

  • 웹 서비스에서 클라이언트는 웹 브라우저이다.
  • 도메인 이름을 입력하면 DNS 서버에 요청을 보내어 해당 도메인 네임의 IP 주소를 얻어온다.
  • 이 IP 주소로 서버에 요청을 보내는 것이다.
  • dig 명령어를 통해서 도메인 이름에 해당하는 ip 주소를 알 수 있다.
$ dig algoexprt.io

; <<>> DiG 9.10.6 <<>> algoexpert.io
;; global options: +cmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 62516
;; flags: qr rd ra; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 1

;; OPT PSEUDOSECTION:
; EDNS: version: 0, flags:; udp: 4096
;; QUESTION SECTION:
;algoexpert.io.			IN	A

;; ANSWER SECTION:
algoexpert.io.		3600	IN	A	35.202.194.70

;; Query time: 61 msec
;; SERVER: 61.41.153.2#53(61.41.153.2)
;; WHEN: Wed Jun 23 16:41:00 KST 2021
;; MSG SIZE  rcvd: 58
  • 서버는 특정 포트에서 요청이 들어오기를 기다린다. 포트는 65535개를 사용 가능하다.
# 로컬 영역에서 TCP나 UDP 통신을 수신하기 위해 포트를 개방
$ nc -l 8021

# 로컬 컴퓨터의 8021 포트로 접속, TCP 통신을 수행
$ nc 127.0.0.1 8021

2021 7월 1주차 TODO

  • dependency injection 공부, 개념 정리
  • python coroutine, eventloop, python에서의 비동기 처리 공부
  • SOLID 패턴 관련 예제 코드 살펴보고 정리할 것

ML model configuration test

class PipelineWithConfig(SimplePipeline):
    def __init__(self, config):
        # Call the inherited SimplePipeline __init__ method first.
        super().__init__()
        # Pass in a config object which we use during the train method.
        self.config = config
            
    def train(self, algorithm=LogisticRegression):
        # note that we instantiate the LogisticRegression classifier 
        # with params from the pipeline config
        self.model = algorithm(solver=self.config.get('solver'),
                               multi_class=self.config.get('multi_class'))
        self.model.fit(self.X_train, self.y_train)
  • SimplePipeline을 상속 받는 PipelineWithConfig 클래스를 만든다. 이 때 생성자에 config 객체를 전달해준다.
  • 이는 PipelineWIthConfig 클래스가 필요로 하는 객체를 외부에서 전달받는 dependency injection이다.
class TestIrisConfig(unittest.TestCase):
    def setUp(self):
        # We prepare the pipeline for use in the tests
        config = {'solver': 'lbfgs', 'multi_class': 'auto'}
        self.pipeline = PipelineWithConfig(config=config)
        self.pipeline.run_pipeline()
    
    def test_pipeline_config(self):
        # Given
        # fetch model config using sklearn get_params()
        # https://scikit-learn.org/stable/modules/generated/sklearn.base.BaseEstimator.html#sklearn.base.BaseEstimator.get_params
        model_params = self.pipeline.model.get_params()
        
        # Then
        self.assertTrue(model_params['solver'] in ENABLED_MODEL_SOLVERS)
  • 이제 모델을 학습시킬 때 파라미터를 함께 전달해줘서 학습을 진행한다.
  • 그 다음 model.get_params() 함수를 통해서 모델 파라미터를 가져오고, 여기서 원하는 config 값이 제대로 전달되었는 지를 테스트한다.

kafka intro

1.1. 카프카 탄생

  • 링크드인에서 시작, 파편화 된 데이터 파이프라인의 복잡도를 낮추는 것을 목적으로 함

스크린샷 2021-09-16 오후 5 05 35

  • 이렇게 복잡하게 꼬여있는 복잡하게 꼬여있는 시스템을 단일화 하고자 함

스크린샷 2021-09-16 오후 5 05 40

  • 이를 통해 웹 사이트, 애플리케이션, 센서 등에서 취함한 데이터 스트림을 한 곳에서 실시간을 관리할 수 있게 되었다.
  • 기업의 대용량 데엍 수집, 실시간 스트림으로 소비할 수 있게 만들어주는 일종의 중추!
  • 카프카를 중앙애 두어 소스 애플리케이션과 타깃 애플리케이션 사이의 의존도를 최소화하여 커플링을 완화
  • 기존 1 대 1 매칭 시스템은 한쪽의 이슈가 다른 한쪽의 애플리케이션에 곧바로 영향, 카프카는 이러한 의돈도를 타파

kafka command line tool

카프카 토픽

  • 카프카에서 데이터를 구분하는 가장 기본적인 개념, RDBMS의 테이블과 유사
  • 하나의 토픽은 여러 파티션을 가질 수 있다. 토픽 내부에서도 파티션을 통해 데이터의 종류를 나누어 처리할 수 있다.

토픽 생성 2가지 방법

  • 카프카 컨슈머 또는 프로듀서가 카프카 브로커에 생성되지 않은 토픽에 대해 데이터를 요청할 때
  • 커맨드 라인 툴로 명시적으로 토픽 생성 (추천함. 왜? 토픽마다 처리되어야 하는 데이터의 특성이 다르기 때문)

토픽 생성시 주의할 점

  • 동시 데이터 처리량이 많은 토픽의 경우 파티션을 크게 설정할 것
  • 단기간 데이터 처리만 필요할 경우 보관 기간을 짧게 설정할 것
  • 그러므로 들어오는 데이터의 양과 병렬 처리 용량을 잘 파악해서 파티션 크기를 정할 것

kafka consumer detail

multithread consumer

  • 파티션을 여러개로 운영하는 경우 데이터 병렬 처리를 위해 파티션 개수와 컨슈머 개수를 동일하게 맞추는 것이 가장 좋다.
  • 토픽은 1개 이상의 파티션으로 이루어져 있으며, 1개의 파티션은 1개의 컨슈머가 할당되어 데이터를 처리할 수 있다.
  • n개 스레드를 가진 1개의 프로세스를 운영하거나, 1개 스레드를 가진 n개의 프로세스를 운영하는 방법이 있으며, 이는 개발자 선택이다.
  • 컨슈머 스레드 비정상 종료일 경우 프로세스가 종료될 수 있고, 이럴 경우 다른 스레드에 영향을 줄 수 있다.
  • 데이터의 중복 또는 유실을 조심해라

컨슈머 운영 전략

  • 멀티 워커 스레드 전략: 컨슈머 스레드는 1개만 실행, 데이터 처리를 담당하는 워커 스레드는 여러개를 실행하는 방법
  • 컨슈머 멀티 스레드 전략: 컨슈머 인스턴스에서 poll 메서드를 호출하는 스레드를 여러개 띄워 사용하는 방법

kafka admin api

admin API

  • kafka 내부 옵션 확인하는 확실한 방법: 브로커에 접속, 카프카 브로커 옵션 확인, 그러나 귀찮다!
  • 내부 옵션 설정 및 조회를 위해 AdminClient 클래스를 제공한다.

AdminClient 활용 예시

  • 구독하는 토픽의 파티션 개수만큼 스레드를 새엇ㅇ하고 싶을 때, 스레드 생성 전에 해당 토픽의 파티션 개수를 admin API를 통해 가져옴
  • 웹 대쉬보드를 통해 ACL이 적용된 클러스터의 리소스 권한 규칙 추가 가능
  • 특정 토픽의 데이터 양이 늘어남을 감지하고 AdminClient 클래스로 해당 토픽의 파티션 늘려주기

adminClient가 제공하는 주요 메서드

describeCluster(DescribeClusterOptions options)
listTopics(ListTopicsOptions options)
listConsumerGroups(ListConsumerGroupsOptions options)
createTopics(Collection<NewTopic> newTopics, CreateTopicsOptions options)
createPartitions(Map<String, NewPartitions> newPartitions, CreatePartitionsOptions options)
createAcls(Collection<AclBinding> acls, CreateAclsOptions options)

kafka producer detail

acks 옵션

  • 0, 1, -1 값을 가질 수 있음
  • 0: 프로듀서가 리더 파티션으로 데이터를 전송했을 때, 리더 파티션으로 데이터가 저장되었는지 확인하지 않는다는 뜻, 이 떄느 오프셋 정보도 확인이 불가. 따라서 retries 옵션도 무의미, 1이나 -1로 설정했을 때보다 성능 상의 우위가 있으나, 데이터 유실 위험이 있다.
  • 1: 리더 파티션에만 정상적으로 적재되었는지 확인, 실패시 리더 파티션에 적재 성공할 때까지 retry 수행. 그러나 이는 팔로워 파티션에 적재 되었는지 여부는 체크 안함. 여전히 유실의 위험이 있다.
  • -1: 리더, 팔로워 파티션에 모두 적재도이었는지 여부를 확인, 가장 느리다. 그럼에도 일부 브로커 장애 발생 상황에서도 안전하게 데이터를 전송하고 저장할 수 있다.
  • all 옵션 값은 min.insync.replicas 옵션을 바라본다. 저 옵션값만큼만 만족하면 된다. 따라서 min.insync.replicas 옵션값을 2로 설정했을 때부터 acks=all로 설정하는 의미가 있다.
  • 따라서 토픽의 레플리카 수는 3, min.insync.replicas=2, acks=all을 추천함

21. 7. week3 TODO

7. 15. Thu

TODO

  • (work) get srcp experiment results and analysis
  • (work) reestablish srcp airflow pipeline
  • (algorithm) solve 2+ leetcode problems
  • (mlops) start read kubeflow book (~ch 4.) and summarize it
  • (stock) summarize book I read yesterday
  • (exercise) 100 push ups 30 pull ups

Review

  • spent too much time to solve algorithm problems (5+ problems)
  • spent too much time to restore airflow pipeline and collect fn true cases
  • didn't study stock. I should do it tommorrow.

python coroutines

Reference

coroutine 개념

  • 서브 루틴의 더 일반화 된 개념이다.
  • 원래는 메인 루틴에서 함수를 호출하면 서브 루틴으로 진입하고, 함수가 종료되면 서브 루틴은 종료가 된다.
  • 코루틴은 이 개념을 확장해서 함수 호출 시 진입하고, 빠져나갔다가 재진입 할 수 있다.
  • 이를 이용해서 async / await가 구현된다.

coroutine을 만드는 방법

async def read_data(db):
    pass

@types.coroutine
def process_data(db):
    data = yield from read_data(db)
    ...

인터뷰 준비 플랜

data structures

  • sorting

알고리즘

  • leetcode 알고리즘 문제 풀이 꾸준히
  • [ ]

프로젝트

  • 포트폴리오 사이트 작성

그 외

  • 인성면접 질문들 준비

Helm study

  • kubernetes의 패키지 매니저
  • kubernetes 자원들의 생성, 버전 관리, 롤백, 삭제 등을 지원

ML monitoring - Intro

Deploying a model to production

스크린샷 2021-09-20 오후 6 30 24

  • 모델을 배포한다는 것은 live 환경에 모델을 올려놓고 live data를 받는다는 얘기다.
  • 더 이상 research environment에서 historical data로 모델을 학습시키지 않는다.
  • research environment와 production environment를 구분해야한다.

Scenarios we often encounter

  • deployment of first model
  • replacement of an existing model
  • editing deployed model (minor tweak)
    -> 두번째 케이스에 집중하는 코스가 될 것

kafka consumer API

Consumer API

public class SimpleConsumer {
    private final static Logger logger = LoggerFactory.getLogger(SimpleConsumer.class);
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";
    private final static String GROUP_ID = "test-group";

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record: records) {
                logger.info("{}", record);
            }
        }
    }
}
  • 컨슈머 그룹을 통해서 컨슈머의 목적을 구분 가능
  • 무한 루프 내에서 poll 함수를 호출해서 카프카 클러스터로부터 데이터를 읽어온다.
  • poll 함수의 결과로 ConsumerRecord 리스트를 반호나한다. poll 메서드는 Duration 타입을 인자로 받는데, 이는 컨슈머 버퍼에 데이터를 기다리기 위한 타임아웃 간격을 의미한다.

컨슈머를 운영하는 2가지 방법

  • 1개 이상의 컨슈머로 이루어진 컨슈머 그룹을 운영하는 것
  • 토픽의 특정 파티션만 구독하는 컨슈머를 운영하는 것

컨슈머 그룹 운영

  • 하나의 컨슈머는 토픽에 묶인 1개 이상의 파티션들에 할당되어 데이터를 가져갈 수 있다.
  • 이 떄, 하나의 파티션은 컨슈머 그룹의 최대 1개의 컨슈머에게만 할당된다.
  • 따라서 컨슈머 개수는 가져가고자 하는 토픽의 파티션 개수보다 같거나 작아야한다.
  • 컨슈머 그룹은 다른 컨슈머 그룹과 격리되는특징이 있다.

컨슈머 그룹 예시 - 시스템 정보 확인 파이프라인

12

  • 데이터를 카프카 브로커에 토픽, 3 파티션으로 저장
  • 이를 별개의 컨슈머 그룹을 만들어서 엘라스틱 서치와 하둡에서 각각 처리
  • 이렇게 하면 엘라스틱 서치나 하둡 둘 중 한군데에 장애가 나더라도 나머지 하나는 정상 동작함

컨슈머 리밸런싱

  • 만일 컨슈머 그룹의 컨슈머가 장애를 일으킨다면 장애가 발생한 컨슈머에 할당된 파티션은 장애가 발생하지 않은 컨슈머에 소유권이 넘어간다. 이 과정을 리밸런싱이라 부른다.
  • 컨슈머 추가 혹은 컨슈머 제외에서 이 리밸런싱이 일어남
  • 카프카 브로커 중 한 대가 그룹 조정자 역할을 수행한다.
  • 컨슈머는 브로커로부터 데이터를 어디까지 가져갔는지 커밋을 통해 기록한다.

오프셋 커밋

  • enable.auto.commit=true로 두면 poll() 메서드가 수행될 때 일정 간격마다 오프셋을 커밋한다.
  • 그러나 컨슈머 강제 종료나 리밸런싱 등이 발생하면 데이터 중복이나 유실 발생이 가능하다.
  • 데이터 중복이나 유실을 허용하지 않는 서비스라면 auto commit을 사용해서는 안된다.
  • 명시적으로 오프셋 커밋을 수행하려면 poll() 메서드 호출 이후 반환 받은 데이터의 처리가 완료되고 commitSync() 메서드를 호출하면 된다.
  • 비동기로 하고 싶으면 commitAsync() 함수를 호출할 것

컨슈머 내부구조

13

  • 컨슈머는 poll() 호출 시점에 데이터를 가져오는 것이 아니라 내부적으로 Fetcher 인스턴스가 생성되어 미리 레코드들을 내부 큐로 가져온다.
  • poll() 메서드를 호출하면 컨슈머 내부 큐에 있는 레코드들을 반환받아 처리를 수행하게 된다.

컨슈머 필수 옵션

  • bootstrap.servers: 프카프카 클러스터에 속한 브로커의 호스트 이름:포트를 1개 이상 작성
  • key.serializer: 레코드의 메시지 키를 직렬화 하는 클래스를 지정
  • value.serializer: 레코드의 메시지 값을 직렬화하는 클래스 지정

컨슈머 선택 옵션

  • group.id: 컨슈머 그룹 아이디 지정, subscribe() 메서드로 토픽을 구독하여 사용할 때는 이 옵션이 필수
  • auto.offset.reset: 지정된 컨슈머 오프셋이 없는 경우 어느 오프셋부터 읽을지 선택하는 옵션, latest, earliest, none 중 1개로 설정
  • enable.auto.commit: 자동 커밋 or 수동 커밋
  • auto.commit.interval.ms: 오프셋 커밋 간격 지정
  • max.poll.records: poll() 메서드를 통해 반호나되는 레코드 개수 지정
  • session.timeout.ms: 컨슈머와 브로커와 연결이 끊기는 최대 시간
  • heartbeat.interval.ms: 하트비트 전송 시간 간격
  • max.poll.interval.ms: poll() 메서드를 호출하는 간격의 최대 시간 지정
  • isolation.level: 트랜잭션 프로듀서가 레코들르 트랜젝션 단위로 보낼 경우 사용

리밸런스 리스너를 가진 컨슈머

  • 컨슈머가 추가 또는 제거되면 파티션을 컨슈머에 재할당하는 과정인 리밸런스가 일어난다.
  • poll() 메서드를 통해 반환받은 데이터를 모두 처리하기 전에 리밸런스가 발생하면 데이터를 중복 처리할 수 있다. 왜냐면 poll()로 받은 데이터를 처리하고 있는 단계고 아직 커밋하지 않았기 때문
  • 떄문에 이러한 중복 처리를 방지하기 위해서는 리밸런스 발생 시 처리한 데이터를 기준으로 커밋을 시도해야 한다.
  • 이를 위해 카프카 라이브러리는 ConsumerRebalanceListener 인터페이스를 지원한다. 이를 구현한 클래스는 onPartitionAssigned() 메서드와 onPartitionRevoked() 메서드이다.
  • onPartitionAssigned()는 리밸런스가 끝난 뒤 파티션이 할당 완료되면 호출되는 메서드
  • onPartitionRevoked()는 리밸런스가 시작되기 직전 호출 메서드

파티션을 직접 할당하는 컨슈머

  • 컨슈머 운영시 subscribe() 메서드 외에 직접 파티션을 컨슈머에 명시적으로 할당도 가능
  • assign() 메서드를 이용하면 된다.
consumer.assign(Collections,singleton(new TopicPartition(TOPIC_NAME, PARTITION_NUMBER)));

컨슈머 안전 종료

  • KafkaConsumer 클래스의 wakeup() 메서드 사용
  • 아래와 같이 wakeup() 메서드가 호출되면 WakeupException이 발생한다. 이를 예외처리한 뒤, finally 문에 consumer.close를 호출해주면 된다.
try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> record: records) {
                    logger.info("{}", record);
                    currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
                            new OffsetAndMetadata(record.offset() + 1, null));
                    consumer.commitSync(currentOffsets);
                }
            }    
        } catch (WakeupException e) {
            logger.warn("Wakeup consumer");
        } finally {
            consumer.close();
        }
  • wakeup() 메서드 호출은 셧다운 훅을 사용하면 구현할 수 있다.
// shutdown hook 등록
Runtime.getRuntime().addShutdownHook(new ShutdownThread());

// wakeup 함수 호출하는 shutdown hook
static class ShutdownThread extends Thread {
        public void run() {
            logger.info("Shutdown hook");
            consumer.wakeup();
        }
    }

ML model prediction quality test

model prediction quality test

  • benchmark test
  • differential test

benchmark test

class TestIrisPredictions(unittest.TestCase):
    def setUp(self):
        # We prepare both pipelines for use in the tests
        self.pipeline_v1 = SimplePipeline()
        self.pipeline_v2 = PipelineWithDataEngineering()
        self.pipeline_v1.run_pipeline()
        self.pipeline_v2.run_pipeline()
        
        # the benchmark is simply the same classification value for 
        # for every test entry
        self.benchmark_predictions = [1.0] * len(self.pipeline_v1.y_test)
    
    def test_accuracy_higher_than_benchmark(self):
        # Given
        benchmark_accuracy = accuracy_score(
            y_true=self.pipeline_v1.y_test,
            y_pred=self.benchmark_predictions)
        
        predictions = self.pipeline_v1.predict(self.pipeline_v1.X_test)
        
        # When
        actual_accuracy = accuracy_score(
            y_true=self.pipeline_v1.y_test,
            y_pred=predictions)
        
        # Then
        print(f'model accuracy: {actual_accuracy}, benchmark accuracy: {benchmark_accuracy}')
        self.assertTrue(actual_accuracy > benchmark_accuracy)
        
    def test_accuracy_compared_to_previous_version(self):
        # When
        v1_accuracy = self.pipeline_v1.get_accuracy()
        v2_accuracy = self.pipeline_v2.get_accuracy()
        print(f'pipeline v1 accuracy: {v1_accuracy}')
        print(f'pipeline v2 accuracy: {v2_accuracy}')
        
        # Then
        self.assertTrue(v2_accuracy >= v1_accuracy)
  • 즉, 현재 버전이 벤치마크 스코어보다 정확도가 높은지 테스트
  • 현재 버전이 이전 버전보다 정확도가 높은지 테스트

kafka broker

설치

  • openjdk 8 버전 설치
  • 카프카 공식 홈페이지에서 바이너리 압축 파일 다운로드 진행

힙 메모리 설정

  • 카프카 브로커는 레코드의 내용은 페이지 캐시로 시스템 메모리를 사용하고, 나머지 객체들을 힙 메모리에 저장하여 사용한다는 특징이 있다.
  • 떄문에 카프카 브로커 운영 시에는 힙 메모리를 5GB 이상으로 설정하지 않는 것이 일반적
  • 힙 메모리 사이즈를 미리 환경 변수로 지정해서 주키퍼와 카프카 브로커 실행 시 발생 가능한 힙 메모리 에러를 예방한다.
$ export KAFKA_HEAP_OPTS="-Xmx400m -Xms400m"

server.properties 설정

  • kafka broker 클러스터 운영에 필요한 옵션들을 지정할 수 있다.
  • advertised.listener: 카프카 클라이언트 또는 커맨드 라인 툴을 브로커와 연결할 때 사용된다.

7 / 6 TODO

  • leetcode medium 1 문제 풀기
  • 블로그에 알고리즘 문제 풀이 간단히 작성하기
  • solid, 디자인 패턴 관련 책 선정하고, single responsibility 관련 내용 숙지하기
  • srcp 최신 모델 학습, 데이터 뽑기
  • shopping SAS fetch spec 코드 작성하기

kafka toy project

적재 정책

  • 데이터의 유실이나 중복 적재가 무관하다면 파이프라인 운영 난이도는 낮아진다.
  • 데이터 유실이나 중복 없이 정확히 한번 적재되는 정책이면 난이도가 올라간다.
  • 적재는 프로듀서부터 컨슈머를 넘어 최종적으로 하둡이나 엘라스틱서치까지 저장되는 것을 의미
  • 정확히 한번 전달은 되더라도 정확히 한번 적재는 되지 않을 수 있다.
  • 이 모든 것들을 트랜젝션으로 처리할 수는 없다. 만일 HDFS 적재, S3 적재 컨슈머 애플리케이션에서 장애가 발생하면 적재에 실패할 수도 있다.
  • 이럴 경우 특정 이슈 발생 시점을 확인하고 그 시점의 파티션 오프셋 부터 다시 적재할 수 밖에 없다.
  • 정확히 한번 적재가 필요할 경우엔 멱등성 프로듀서를 사용하고 RDBMS를 사용하는 것이 바람직하다.

fastapi deployment

Reference

fastapi 배포

  • 로컬 환경에서는 uvicorn --reload로 띄워놓고 작업을 한다.
  • 프러덕션 환경에서는 멀티 프로세스를 띄워야 하는데 이 떄 여러 선택지가 존재한다.
    • uvicorn --workers={num_workers}
    • gunicorn -k uvicorn.workers.UvicornWorker -w {num_workers}
    • single uvicorn container and kubernetes
  • uvicorn 레포의 discussion을 보면 uvicorn의 multiprocess 기능은 불완전해서 gunicorn을 통해서 사용할 것을 권장한다.
  • 쿠버네티스 환경일 경우에는 단일 컨테이너에 uvicorn single process를 띄우고, 컨테이너를 여러개 띄우는 방식을 권장한다.

vimrc

set ts=4
set sw=4
set et
set hls
set nu
syntax on

call plug#begin('~/.vim/plugged')

Plug 'davidhalter/jedi-vim'
let g:jedi#show_call_signatures=0       " 자세히 설명하는 창을 보여준다 1=활성화, 0=비>활성화
let g:jedi#popup_select_first="0"       " 자동완성시 자동팝업 등장 x
let g:jedi#force_py_version=3           " 자동완성 3 = python3 , 2 = python2

Plug 'hynek/vim-python-pep8-indent'   " python 자동 들여쓰기 Plugin
filetype plugin indent on               " python 자동 들여쓰기 on

Plug 'nvie/vim-flake8'                " python 문법 검사 plugin
let g:syntastic_python_checkers=['flake8']        " ↓ 실행시 현재줄 주석 해제필요
" let g:syntastic_python_flake8_args='--ignore='    " 무시하고자 하는 errorcode

Plug 'preservim/nerdtree'

call plug#end()

좋은 팀 문화에 대해 느끼는 점

코드 리뷰의 필요성

  • 서로의 코드를 신경써야 한다. 장애가 나면 그 팀원 탓을 하는 것이 아닌, 그 전에 리뷰를 꼼꼼하게 하지 못한 모두의 탓이 되어야 한다.
  • 테스트 프로세스가 정립되어 있어야한다. API 호출이 포함되면 timelimit, retry 등의 기준이 미리 정해져 있고, 이를 테스트 과정에서 충분히 검증했는 지를 확인할 수 있어야한다.
  • 그러기 위해 리더의 역할이 중요하다고 생각한다. 리더가 신경을 쓰고 있고, 코드를 살펴보고, 소프트웨어 제품의 품질에 신경을 써야한다. 문화를 만들어나가는건 팀원 모두이지만, 중심은 팀장이라고 생각한다.

Asgi for web development

Reference

wsgi limitations

  • websockets
  • HTTP/2
  • can't use async or await

from aiohttp import web

async def hello(request):
    await db.execute("SELECT * FROM tbl")
    return web.Response(text="Hello, world")

app = web.Application()
app.add_routes([web.get('/', hello)])
web.run_app(app)

2019 frameworks

  • aiohttp, sanic, japronto, vibora, fastAPI 등 async를 기반으로 한 프레임워크들이 등장하기 시작했다.
  • 즉, 다양한 async web application framework 들이 등장했다. WSGI 때와 마찬가지로 이식성을 높여서 유저 편의성을 높여줄 필요성이 제기 되었고 이것이 ASGI의 모티베이션이 되었다.
  • 현재 어떤 서버를 사용하던지와 상관없이 web appliction framework를 선택할 수 있도록 선택지를 제공해준다.

ASGI

async def application(scope, receive, send):
    event = await receive()
    await send(
        {"type": "websocket.send", ...}
    )
async def app(scope, receive, send):
    assert scope["type"] == "http"
    await send({
        "type": "http.response.start",
        "status": 200,
        "headers": [
            (b"content-type", "text/plain"),
        ],
    })
    await send({
        "type": "http.response.body",
        "body": b"Hello, world!",
    })

ML system testing concepts

why test?

  • confidence of the system을 측정할 수 있어야 한다.
  • moitoring historic system behavior를 분석해라

predicticting reliability

  • 대부분의 소프트웨어 시스템은 변화한다.
  • 이 떄, 변화가 시스템에 어떠한 영향을 줄 지 예측을 할 수 있어야 한다.

functionality

  • confidence that functionality remains unchanged
  • 즉, 테스트란 우리 시스템이 우리가 기대한 대로 동작하는 지를 보여주는 방식이다. 설령 시스템에 변화가 있다 하더라도.

wsgi for web developer

Reference

Follow the requests

  • django로 된 app application server를 실행시키려면 아래와 같은 명령어를 사용한다.
$ gunicorn --workers=2 mydjangoapp.wsgi
  • 즉, gunicorn과 django app을 연결해주는 것은 wsgi이다.
  • 그런데 wsgi란 무엇일까?

wsgi

  • PEP 333(3)에서 규정이 되어 있다.
  • 웹 서버와 python web application applications or frameworks 사이의 표준 인터페이스를 제공한다.
  • 이를 통해 web application portability across a variety of web servers 를 높여준다.
  • 여기서의 web server는 apache, nginx 등의 솔루션을 말한다.
  • 즉 앞단에 어떤 web server를 두더라도 wsgi 서버를 중간에 두면 뒷 단에 web applicaiton 서버와 효환이 된다!

static web severs

  • 서버에 저장된 정적인 파일들을 서빙해준다. 이 떄 캐시 등을 활용한다.
  • 그러나 이는 정적 파일만 해당한다.

Common Gateway Interface

  • dyanmic content를 제공하기 위해서 탄생한 것이 CGI이다.
  • 그러나 이는 매 request마다 restart를 해주어야하는 단점이 있었다.

WSGI

  • fast, dynamic, pythonic한 방식을 원했다!

Gunicorn

  • PEP 3333 wsgi의 구현체이다.
  • django, flask, bottle과 같은 프레임워크들은 wsgi-compliant web app이다.
  • gunicorn은 wsgi-compliant web-app 들을 동작시킬 수 있는 wsgi server이다.
  • gunicorn은 pre-fork worker 모델을 사용한다.

스크린샷 2021-09-13 오전 11 41 37

kafka streams

kafka streams

  • 토픽에 적재된 데이터를 실시간으로 변환하여 다른 토픽에 적재하는 라이브러리
  • 카프카 스트림 처리를 위해 Spark, Flink, Storm, Fluentd와 같은 오픈 소스들도 존재
  • 그럼에도 카프카 스트림의 장점은 일단 공식 지원 라이브러리. 카프카 버전이 오를 때마다 함께 릴리지 됨
  • 신기능 추가가 활발함. 안정성이 매우 뛰어남

14

  • 스트림즈 애플리케이션은 JVM 위에서 하나의 프로세스로 실행됨

카프카 스트림즈 애플리케이션

  • 내부적으로 1개 이상의 스레드 생성이 가능
  • 스레드는 1개 이상의 테스크를 가짐
  • 테스크는 데이터 처리 최소 단위

topology

15

  • 카프카에서는 토폴로지를 이루는 노드를 하나의 프로세서라고 부르고 노드와 노드를 이은 선을 스트림이라 부른다.
  • 스트림은 토픽의 데이터를 뜻하며 이는 레코드와 동이랗다.
  • 프로세서: 소스 프로세서, 스트림 프로세서, 싱크 프로세서 3가지가 있다.
    • 소스 프로세서: 토픽에서 데이터를 가져오는 역할
    • 스트림 프로세서: 다른 프로세서가 반환한 데이터를 처리하는 역할
    • 싱크 프로세서: 데이터를 특정 카프카 토픽으로 저장하는 역할을 수행, 최종 종착지
  • 스트림즈DSL과 프로세서 API 2가지 방법으로 개발이 가능

스트림즈 DSL 데이터 처리 예시

  • 메시지 값을 기반으로 한 토픽 분기 처리
  • 지난 10분간 들어온 데이터의 개수 집계
  • 토픽과 다른 토픽의 결합으로 새로운 데이터 생성

프로세서 API로 구현하는 데이터 처리 예시

  • 메시지 값의 종류에 따라 토픽을 가변적으로 전송
  • 일정한 시간 간격으로 데이터 처리

kafka 사용 인프라 아키텍쳐

예시 데이터 파이프라인

20

  • 보통 프런트엔드에서 사용자 이벤트를 받으면 이를 스프링 기반의 API 서버를 찌른다. 이게 프로듀서가 된다.
  • 프로듀서 앱은 카프카 클러스터에 데이터를 적재한다.
  • 이를 컨슈머 단에서 전달 받아서 하둡에 적재한다.
  • 카프카 커넥트를 이용해서 엘라스틱 서치에 적재하고, 키바나로 시각화 하기도 한다.

스케일 아웃

  • 웹 이벤트가 늘어날 경우에는 먼저 프로듀스 앱 스케일 아웃을 적용한다.
  • 그 다음 카프카 파티션 수를 늘려준다.
  • 파티션 수를늘려주면 이에 따라 컨슈머 앱의 스레드 개수를 늘려주어야 한다.
  • 카프카 커넥트의 경우에는 테스크 개수를 늘려주면 된다.

fastapi orm

ORM

  • fastapi도 마찬가지로 sqlalchemy와 같은 ORM을 사용한다.
  • pydantic은 어디까지나 data validation을 담당한다. 이는 ORM을 대체할 수 없다.
  • 따라서 fastapi 프로젝트를 진행할 때에는 ORM model 파일과 pydantic 스키마 파일을 별도로 만들어주어야한다.

fastapi tutorial

FastAPI

  • starlette 객체를 바로 상속받는다.
  • starlette은 ASGI를 구현하는 python web framework이다. 그 자체로 사용도 가능하고, sdk로 사용도 가능하다.
  • fastapi는 이 starlette를 기반으로 한다.
class FastAPI(Starlette):
    ...

API 정의하기

from fastapi import FastAPI

app = FastAPI()


@app.get("/")
async def root():
    return {"message": "Hello World"}
  • 앱 객체를 만든 뒤, 데코레이터를 이용해서 method, url을 등록해준다.
  • 이 때 async를 붙여도 되고, 안붙여도 된다.
  • 안붙이게 될 경우 그냥 synchronous하게 동작을 처리한다.

20210727-20210801

Weekly Goals

  • use ML serving stacks
    • TF serving
    • Pytorch serving
    • bentoML
    • sheldonML
    • KF serving
  • take all helm courses

pydantic

pydantic

  • python annotation으로 data validation, settings management를 제공
  • model class를 만들고 그 안에 멤버 변수들에 타입 검사를 제공한다.
class User(BaseModel):
    id: int
    name = 'John Doe'
    signup_ts: Optional[datetime] = None
    friends: List[int] = []
  • id처럼 타입만 붙어있으면 required field라는 의미이다. 여기에 문자열, 바이트, 숫자 등이 오면 int로 변환해버린다. 변환 불가시에는 exception을 띄워버린다.
  • name의 경우에는 기본값이 주어진 string 변수이다.
  • signup_ts는 optional 하지만 기본값으로 None을 갖는다.
  • friends는 int list 타입을 갖는다. 그 외의 타입이 전달될 경우 int로 변환 가능하면 변환시킨다.

pydantic 장점

  • IDE와 궁합이 좋다.
  • data validation과 system setting loading에 사용이 가능하다. (dual use)
  • 빠르다.
  • 복잡한 자료구조의 validation을 수행할 수 있다.
  • 확장 가능하다.
  • dataclass를 덧붙일 수 있다. (아직 어떤 의미인지 모르겠음)

kafka connect

Kafka Connect

  • 데이터 파이프라인 생성 시 반복 작업을 줄이고 효율적인 전송을 이루기 위한 애플리케이션
  • 특정한 작업 형태를 템플릿으로 만들어놓고 커넥터를 실행함으로써 반복을 줄일 수 있다.

커넥터의 두가지 종료

  • 소스 커넥터: 프로듀스 역할 수행
  • 싱크 커넥터: 컨슈머 역할 수행

17

커넥트 동작 방식

  • 사용자가 커넥트에 생성 명령을 내리면 커넥트는 내부에 커넥터와 테스크를 생성한다.
  • 커넥터는 테스크들을 관리한다. 테스크는 커넥터에 종속되는 개념으로 실제 데이터 처리를 진행한다.
  • 그렇기에 데이터 처리를 정상적으로 하는지 확인하기 위해서는 각 테스크의 상태를 확인해야한다.

20210719 ~ 20210725 TODO

Weekly Plan

  • (work) finish srcp project
  • (work) review quickwit
  • (work) study kubeflow
  • (stock) finish reading stock price book(미국 주식으로 부자되기) 2 times and summarize it. Do real practice
  • (mlops) finish read kubeflow book
  • (algorithm) solve +10 leetcode problems
  • (exercies) run +5km on everyday morning, 100 push ups, 30 pull ups

cs 기초 - 소팅

  • 정렬 알고리즘들의 종류와 time complexity, 적합한 사용처 등을 조사한다.

db 면접 질문 아카이빙

Relational Database란?

  • 관계형 모델을 기반으로 데이터를 저장하는 프로그램.
  • 관계형 모델은 데이터를 테이블 형식으로 정의

스크린샷 2021-06-29 오후 1 55 38

  • table을 relation이라 부르며 이는 여러개의 column으로 구성, column은 Attribute라고도 부른다.
  • 각각의 row는 tuple이라고 부른다.
  • 이러한 relation에 여러 제약 조건을 부여할 수 있다.
  • 주로 SQL을 이용하여 관리할 수 있다.

Non Relational Database란?

  • 관계형 모델이 아닌, 다른 데이터 모델을 기반으로 데이터를 저장하고 관리하는 프로그램

간단히 암기해야할 코드들

merge sort

def merge(arr, l, m, r):
    n1 = m - l + 1
    n2 = r- m
  
    # create temp arrays
    L = [0] * (n1)
    R = [0] * (n2)
  
    # Copy data to temp arrays L[] and R[]
    for i in range(0 , n1):
        L[i] = arr[l + i]
  
    for j in range(0 , n2):
        R[j] = arr[m + 1 + j]
  
    # Merge the temp arrays back into arr[l..r]
    i = 0     # Initial index of first subarray
    j = 0     # Initial index of second subarray
    k = l     # Initial index of merged subarray
  
    while i < n1 and j < n2 :
        if L[i] <= R[j]:
            arr[k] = L[i]
            i += 1
        else:
            arr[k] = R[j]
            j += 1
        k += 1
  
    # Copy the remaining elements of L[], if there
    # are any
    while i < n1:
        arr[k] = L[i]
        i += 1
        k += 1
  
    # Copy the remaining elements of R[], if there
    # are any
    while j < n2:
        arr[k] = R[j]
        j += 1
        k += 1
  
# l is for left index and r is right index of the
# sub-array of arr to be sorted
def mergeSort(arr,l,r):
    if l < r:
  
        # Same as (l+r)//2, but avoids overflow for
        # large l and h
        m = (l+(r-1))//2
  
        # Sort first and second halves
        mergeSort(arr, l, m)
        mergeSort(arr, m+1, r)
        merge(arr, l, m, r)

Monitoring Machine Learning Models in Production

1. The ML system life cycle

  • continuous delivery for machine learning (CD4ML)

스크린샷 2021-09-09 오후 7 00 38

  1. Model building
  • understanding the problem
  • data preparation
  • feature engineering
  • initial code, rough jupyter notebook
  1. Model Evaluation and Experimentation
  • feature selection
  • hyper paramter tuning
  • comparing the effectiveness of different algorithms on the given problem
  • notebooks with stats, graph evaluating feature weights, accuracy, precision, Receiver Operating Characteristics
  1. Productionize Model
  • preparing it so it can be deployed
  • production grade code
  • different programming language and framework
  1. Testing
  • ensuring that production code behaves in the way we expect it to
  • test cases
  1. Deployment
  • API for accessing the model
  1. Monitoring and Observability
  • ensure our model is doing what we expect it to in production.

kafka main concepts

카프카 브로커, 클러스터, 주키퍼

  • log directory에 가보면 토픽명-파티션번호로 디렉터리가 만들어진 것을 확인할 수 있다.
  • 그리고 consumer_offset도 순서대로 저장되어 있는 것을 볼 수 있다.
  • 파티션 폴더 아래에는 다음과 같은 파일들이 들어있다.
$ ls hello.kafka-0/
00000000000000000000.index  00000000000000000000.log  00000000000000000000.timeindex  leader-epoch-checkpoint
  • .log 파일에는 메시지와 메타데이터, index는 메시지의 오프셋을 인덱싱한 정보를 담는다. timeindex 파일에는 메시지에 포함된 timestamp 값을 기준으로 인덱싱한 정보가 담겨있다.
  • 카프카는 OS의 페이지 캐시를 이용한다. JVM 위에서 동작하는 카프카 브로커가 페이지 캐시를 사용하지 않는다면 지금과 같은 빠른 동작을 기대할 수 없다. 왜냐면 캐시를 직접 구현 시에는 데이터의 지속적인 변경으로 GC가 자주 일어나 속도가 느려질 것이기 때문
  • 때문에 카프카 브로커는 실행 시에 힙 메모리 사이즈를 크게 설정할 필요가 없는 것이다.

데이터 복제, 싱크

  • 데이터 복제는 파티션 단위로 이루어짐
  • replication 가능 최대 값은 브로커의 개수
  • 복제된 파티션은 리더와 팔로워로 구성
  • 프로듀서 또는 컨슈머와 직접 통신하는 파티션을 리더, 나머지를 팔로워라 부른다.
  • 팔로워 파티션은 리더 파티션의 오프셋을 확인해서 자신의 오프셋과 차이가 나면 리더로부터 데이터를 가져와서 자신의 파티션에 저장하며, 이를 복제라고 부른다.
  • 리더 파티션이 저장되어 있는 브로커가 다운되면 팔로워 파티션 중 하나가 리더로 선출된다.
  • 토픽에 따라서 레플리카의 개수를 다르게 설정하기도 한다. 유실이 발생하더라도 처리 속도가 중요하다면 1이나 2, 유실 되면 안되는 중요한 데이터의 경우엔 3 이상으로 설정한다.

kafka producer api

카프카 클라이언트

  • 카프카 클러스터에 명령을 내리거나 데이터를 송수신하기 위해 카프카 클라이언트 라이브러리는 카프카 프로듀서, 컨슈머, 어드민 클라이언트를 제공하는 카프카 클라이언트를 사용해서 애플리케이션을 개발한다.
  • 프로듀서 API: 전송 시 리더 파티션을 가지고 있는 카프카 브로커와 통신, 데이터를 직렬화 하여 전송, 자바 기본형 데이터 뿐만 아니라 동영상, 이미지 같은 바이너리 데이터도 프로듀서를 통해 전송할 수 있다.
public class SimpleProducer {
    private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
        String messageValue = "testMessage";
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
        producer.send(record);
        producer.flush();
        producer.close();
    }
}
  • 실제 운영 환경에서는 데이터가 정상적으로 전송되었는지 확인하는 로직, 환경에 따른 추가적인 프로듀서 선택 옵션들의 설정 등과 같은 로직들이 포함된다.
  • 애플리케이션 단에서 토픽에 레코드를 전송하기 전에 카프카 커맨드를 이용해서 미리 토픽을 만들어준다.

11

  • 프로듀서 애플리케이션은 카프카 클러스터로 레코드를 전송한다.
  • 이 떄 내부적으로 파티셔너, 어큐물레이터, 센더를 거치게 된다.
  • 레코드 생성 시 파티션을 직접 지정하거나 타임스탬프 지정, 메세지 키 지정 등이 가능하다.
  • 프로듀서 인스턴스의 send()를 호출하게 되면 먼저 파티셔너에서 토픽의 어느 파티션으로 전송할 것인지 결정된다.
  • 파티션이 구분된 데이터는 어큐물레이터에 토픽별로 버퍼에 쌓인 다음 배치 단위로 전송된다.

프로듀서 필수 옵션

  • bootstrap.servers: 프카프카 클러스터에 속한 ㅍ브로커의 호스트 이름:포트를 1개 이상 작성
  • key.serializer: 레코드의 메시지 키를 직렬화 하는 클래스를 지정
  • value.serializer: 레코드의 메시지 값을 직렬화하는 클래스 지정

프로듀서 선택 옵션

  • acks: 프로듀서가 전송한 데이터가 브로커들에 정상적으로 저장되었는지 전송 성공 여부를 확인하는데 사용
    • 0, 1, -1 중 택 1, 1은 리더 파티션에 데이터가 저장되면 전송 성공, 0은 프로듀서가 전송한 즉시 브로커에 저장 여부와 상관없이 성공, -1은 리더 파티션과 팔로워 파티션에 데이터가 저장되면 성공하는 것으로 판단
  • buffer.memory: 브로커로 전송할 데이터를 배치로 모으기 위해 설정할 버퍼 메모리 양
  • retries: 브로커로부터 에러를 받고 난 뒤 재전송 시도 횟수 설정
  • batch.size: 배치로 전송할 레코드 최대 용량 지정, 기본값은 16384
  • linger.ms: 배치를 전송하기 전까지 기다리는 최소 시간, 기본값은 0
  • partitioner.class: 레코드를 파티션에 전송할 때 적용하는 파티션 클래스
  • enable.idempotence: 멱등성 프로듀서로 동작할 지 여부를 설정
  • transactional.id: 프로듀서가 레코드를 전송할 때 레코드를 트랜잭션 단위로 묶을지 여부를 설정

메시지 키를 가진 데이터 전송 프로듀서

// key 설정 레코드
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "Pangyo", "23");

// partition 지정 레코드
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, 0, "Pangyo", "23");

커스텀 파티셔너

  • Partitioner 인터페이스를 직접 구현해서 특정 키가 특정 파티션에 배정되도록 적용할 수 있다.

브로커 정상 전송 여부 확인 프로듀서

ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
RecordMetadata metadata = producer.send(record).get();
logger.info(metadata.toString());
  • producer.send(record).get() 함수를 호출하면 동기적으로 프로듀서가 보낸 데이터의 결과를 확인할 수 있다.
  • 레코드가 정상적으로 브로커에 적재되면 토픽 이름-파티션번호-오프셋번호 가 리턴된다. ex) test-1@3
  • 콜백 함수를 만든 뒤, send 함수에 콜백 함수를 추가해서 전송하면 비동기적으로 데이터 전송 결과를 확인할 수 있다.

heap 자료구조

  • priority queue를 구현할 때 heap을 사용한다. 하지만 그 원리나 구현을 자세히 알지 못하므로 정리할 것

fastapi dependencies

Dependency Injection

  • 아래와 같은 상황에 사용하는 프로그래밍 기법
    • shared logic
    • share database connections
    • security, authentification, role requirements
from typing import Optional

from fastapi import Depends, FastAPI

app = FastAPI()


async def common_parameters(q: Optional[str] = None, skip: int = 0, limit: int = 100):
    return {"q": q, "skip": skip, "limit": limit}


@app.get("/items/")
async def read_items(commons: dict = Depends(common_parameters)):
    return commons


@app.get("/users/")
async def read_users(commons: dict = Depends(common_parameters())):
    return commons
  • Depends를 이용해서 common_parameters 함수에 의존성을 추가함. 마치 Body, Query 등을 쓰는 것과 똑같음
  • items나 users로 새로운 요청이 들어올 경우 먼저 dependency를 호출한다.
  • 즉, 특정 path operation 단에서는 이것이 의존하고 있는 것을 지정만 하면 FastAPI가 이를 수행하고, 그 결과값만 해당 path operation에 주입해주게 된다. 이를 의존성 주입이라고 한다.
  • dependency injection을 의미하는 다른 용어들로는 아래와 같은 것들이 있다.
    • resources
    • providers
    • services
    • injectables
    • components

DS school - Unit testing production model

python convention

type hints

import typing as t

def add_two_integers(first: int, second: t.Optional[int] = None) -> int:
    """Sum two numbers."""
    result = first
    if second:
        result = first + second
    return result

Dependency Injection

참고

Dependency Injection이란?

  • 하나의 객체가 이것이 의존하고 있는 다른 객체들을 수신하는 기법이다.
  • 수신하는 측을 클라이언트, 전달되는 측을 서비스라고 부른다.
  • 서비스를 클라이언트에 건네주는 코드를 인젝터라고 부른다.
  • 클라이언트가 어떤 서비스를 사용할 지 명시하는 대신에 인젝터가 클라이언트에게 어떤 서비스를 써도 되는지 알려주는 것이다.
  • 인젝션은 의존성의 전달을 지칭한다.
  • 서비스는 클라이언트의 상태로 구성된다. 클라이언트가 직접 서비스를 만들거나 찾도록 하는 것 대신 서비스를 클라이언트에 전달하는 것이 이 패턴의 핵심이다.

Dependency Indejection은 왜 필요할까?

  • 관심의 분리를 목적으로 한다.
  • 즉, 객체의 생성과 사용을 분리한다. 이는 코드의 가독성과 재사용성을 높여준다.
  • 이는 Inversion of Control의 한 기법에 해당한다.
  • 서비스를 호출하는 클라이언트의 입장에서는 그 서비스들을 어떻게 만드는지 몰라도 된다.
  • 대신에 클라이언트는 서비스를 제공하는 것에 대한 책임을 외부 코드에 위임한다.
  • 클라이언트는 인젝터의 코드를 호출하지 않는다. 서비스를 만드는 것은 어디까지나 인젝터이다.
  • 이는 클라이언트가 인젝터에 대해서도 몰라도 되고, 서비스를 어떻게 만드는지도 몰라도 되고, 어느 서비스를 이것이 사용하고 있는지도 몰라도 된다.
  • 클라이언트는 단지 서비스의 인터페이스만 알면 된다.
  • 이는 즉, "use"와 "construction"에 대한 책임을 분리하는 것이다.

log collectors

log collector

  • apache access log와 같이 로그를 파일로 써주는 애플리케이션들이 있다.
  • 파일 형태의 로그를 카프카나 엘라스틱 서치에 밀어넣어 주기 위해 필요한 기술 스택이 log shipper이다.
  • 대표적인 오픈소스로는 logstash, fluentd, filebeat이 있다.

kafka topic and partition

토픽 생성 시 파티션 개수 고려사항

  • 데이터 처리량

  • 메세지 키 사용 여부

  • 브로커, 컨슈머 영향도

  • 파티션 개수가 많아지면 1:1 매칭되는 컨슈머 개수가 늘어난다. 따라서 데이터 처리량 측정이 중요하다.

데이터 처리량

  • 컨슈머의 처리량을 늘리는 것 (컨슈머 서버의 스케일 업)
  • 컨슈머를 추가하여 병렬처리량을 늘리는 것 (가장 확실)
  • 파티션 개수만큼 컨슈머를 추가하는 방법이 데이터 처리량을 늘리는 가장 확실한 방법
  • 그러므로 프로듀서가 보내는 데이터 양과 컨슈머의 처리량을 계산하면 파티션 개수가 나온다.
producer 데이터 전송량 < 컨슈머 처리량 x 파티션 개수
  • 컨슈머 데이터 처리량을 구하기 위해서는 더미 데이터로 테스트를 진행해볼 것

python 면접 질문 아카이빙

  • python에서 str은 불변 객체이다. 왜일까?

  • python은 원시 타입을 지원하는가? 그렇지 않다면 그 이유는?
    python은 원시 타입을 지원하지 않는다. python에서 모든 자료형은 객체로 취급된다. 이는 언어 차원에서 강력한 기능을 제공해서 사용자의 편의성을 높이기 위한 조치이다.

  • python이 느린 이유를 아는대로 설명하라
    먼저 인터프리터 언어이기 때문에 컴파일이 없다. 코드 한 줄씩 기계어로 번역하여 실행하다 보니 오버헤드가 발생한다. 다음으로 원시 타입을 지원하지 않는다. 때문에 연산을 많이 수행할 때 오버헤드가 발생한다. 마지막으로 GIL의 존재로 멀티 코어를 활용한 성능 향상을 구현하는데 한계가 있다.

  • GIL을 설명하라
    python interpreter가 관리하는 일종의 mutex로 한번에 하나의 스레드가 코어를 점유하게끔 한다. 따라서 python 단에서 여러 쓰레드를 사용한다 하더라도 실제 하드웨어 코어는 하나만 사용하게 된다. 파이썬에서 멀티 코어를 활용하려면 멀티 프로세스를 사용해야 한다.

  • python에서 숫자는 int로만 처리된다. 다른 언어는 크기별로 다양한 타입을 지원하는데 어떻게 가능할까?
    python에서의 int는 객체이다. 그리고 이는 무제한 자릿수 기능을 제공한다. int는 정수를 숫자의 배열로 간주한다. 즉, 자릿수 단위로 쪼개어 배열 형태로 표현한다. 이는 언어 차원에서 지원하는 강력한 기능이지만 그만큼 계산 성능과 메모리 사용량을 포기하는 조치이다.

  • list를 dictionary의 키 값으로 사용할 수 없는 이유를 설명하라
    list는 mutable한 자료형이다. dictionary는 키 값을 hash 하는데 키 값이 변경 가능하면 안된다. 따라서 list는 키 값으로 사용이 불가하고, immutable한 tuple은 사용 가능하다.

  • list comprehension을 설명하라
    list 문법을 사용해서 새로운 list를 정의하는 기법으로 가독성이 좋아 python에서 많이 사용된다. map이나 filter같은 함수형 문법의 사용보다 list comprehension을 사용하는게 더 pythonic하다고 권장된다.

  • generator를 설명하라
    yeild를 사용하여 생성이 가능, 리턴값을 전달하고 반복의 위치를 기억, 다음번 호출 시엔 그 다음 시행을 한 뒤 값을 리턴해주어 메모리 efficient하게 반복을 구현할 수 있다. 딥 러닝에서 data loader를 만들 때 사용하기도 한다.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.