Skip to content

Latest commit

 

History

History
379 lines (296 loc) · 12.4 KB

멀티 스레드 웹 서버 만들기.md

File metadata and controls

379 lines (296 loc) · 12.4 KB

https://doc.rust-lang.org/book/ch20-00-final-project-a-web-server.html
Docs에 있는 예제를 따라 정리한 글임을 미리 밝힙니다.

  • 요청했을 때 아래와 같은 웹 화면을 반환하는 웹 서버를 만들어보자!
  • Rust 프로젝트는 아래와 같이 생성할 수 있다.
    $ cargo new hello --bin
        Created binary (application) `hello` project
    $ cd hello

TCP 연결 처리

  • 우선 std::net를 사용해서 하나의 TCP 연결을 처리하는 코드를 짜볼 것이다.
  • TcpListener 를 사용하여 127.0.0.1:7878 주소로 TCP연결을 수신할 수 있다.
  • 위 코드에서 bind 함수는 new 함수처럼 동작하며 TcpListner의 새 인스턴스를 반환한다.
    • bind 함수는 바인딩의 성공여부를 나타내는 Result<T, E>를 반환한다.
  • TcpListener의 incoming 메소드는 스트림의 차례에 대한 iterator를 반환한다. 각각의 stream 은 클라이언트와 서버간의 열려있는 커넥션을 의미한다.
use std::net::TcpListener;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        println!("Connection established!");
    }
}

요청 데이터 읽기

  • 브라우저로부터의 요청을 읽는 기능을 구현하기 위해 handle_connection 이라는 함수를 새로 만들어보자.
use std::io::prelude::*;
use std::net::TcpStream;
use std::net::TcpListener;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 512];

    stream.read(&mut buffer).unwrap();

    println!("Request: {}", String::from_utf8_lossy(&buffer[..]));
}
  • std::io::prelude를 가져와 스트림으로부터 읽고 쓰는것을 허용하는 특성에 접근할 수 있도록 한다.

  • handle_connection 함수에선, stream 매개변수를 가변으로 만들어 줬다. 이유는 TcpStream 인스턴스가 내부에서 어떤 데이터가 우리에게 반환되는지 추적하기 때문이다. 요청하는것에 따라 더 많은 데이터를 읽거나, 다음 요청때까지 데이터를 저장하는 등 내부의 상태가 변경될 수 있기에 mut이 되어야 한다.

  • 이제 실제로 스트림으로부터 데이터를 읽어보자.

  • 데이터를 읽기 위해선 buffer(버퍼)를 읽을 데이터를 저장할 스택에 선언해야 한다. 그리고 버퍼를 stream.read 로 전달하여 TcpStream으로부터 읽어들인 바이트를 버퍼로 집어넣는다.

  • 이후 버퍼 안에있는 바이트들을 문자열로 변환하고 출력한다. String::from_utf8_lossy 함수는 &[u8] 을 전달받고 String 으로 바꿔서 제공해준다.

  • 프로그램을 실행하고 TCP로 접속하면 아래와 같이 출력될 것이다.

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
    Finished dev [unoptimized + debuginfo] target(s) in 0.42 secs
     Running `target/debug/hello`
Request: GET / HTTP/1.1
Host: 127.0.0.1:7878
User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64; rv:52.0) Gecko/20100101
Firefox/52.0
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8
Accept-Language: en-US,en;q=0.5
Accept-Encoding: gzip, deflate
Connection: keep-alive
Upgrade-Insecure-Requests: 1
������������������������������������

HTTP 응답

  • handle_connection 함수의 println!을 지우고 원하는 HTTP 형식의 메시지를 반환해보자.

  • 우선 간단한 html을 작성한다.

    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="utf-8">
        <title>Hello!</title>
    </head>
    <body>
        <h1>Hello!</h1>
        <p>Hi from Rust</p>
    </body>
    </html>
  • HTML파일을 읽고, 응답의 body에 추가, 전송하도록 코드를 수정한다.

    use std::fs::File;
    // --생략--
    
    fn handle_connection(mut stream: TcpStream) {
        let mut buffer = [0; 512];
        stream.read(&mut buffer).unwrap();
    
        let mut file = File::open("hello.html").unwrap();
    
        let mut contents = String::new();
        file.read_to_string(&mut contents).unwrap();
    
        let response = format!(
            "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}",
            contents.len(),
            contents
        );
    
        stream.write(response.as_bytes()).unwrap();
        stream.flush().unwrap();
    }
  • HTTP MethodGET인 경우에만 Hello 페이지를 반환하고, 그렇지 않은 경우 404를 반환하도록 하려면 html을 추가해주고 if문으로 분기처리해주면 된다.

    // --생략--
    
    fn handle_connection(mut stream: TcpStream) {
        // --생략--
    
        let (status_line, filename) = if buffer.starts_with(get) {
            ("HTTP/1.1 200 OK", "hello.html")
        } else {
            ("HTTP/1.1 404 NOT FOUND", "404.html")
        };
    
        let mut file = File::open(filename).unwrap();
        let mut contents = String::new();
    
        file.read_to_string(&mut contents).unwrap();
    
        let response = format!(
            "{}\r\nContent-Length: {}\r\n\r\n{}",
            status_line,
            contents.len(),
            contents
        );
    
        stream.write(response.as_bytes()).unwrap();
        stream.flush().unwrap();
    }

멀티스레드로 바꾸기

  • 스레드풀은 대기중이거나 작업을 처리할 준비가 되어 있는 스레드들의 그룹이다.
  • 요청이 들어온다면, 요청들은 처리를 위해 풀로 보내지고 풀에선 들어오는 요청들에 대한 큐(queue)를 유지하도록 할 것이다.
  • 풀 내의 각 스레드들은 이 큐에서 요청을 꺼내서 처리하고 또 다른 요청이 있는지 큐에 물어보며 여러 요청을 동시에 처리한다.
  • 구조는 다음과 같다.
  1. ThreadPool 은 채널을 생성하고 채널의 송신단을 유지한다.
  2. 각 Worker 는 채널의 수신단을 유지한다.
  3. 우린 채널로 전송하려는 클로저를 저장할 새로운 Job 구조체를 생성한다.
  4. execute 메소드는 채널의 송신단으로 실행하려는 작업을 전송한다.
  5. 스레드에선 Worker 가 채널의 수신단에서 반복되며 수신되는 모든 작업의 클로저를 실행한다.
use std::thread;
type Job = Box<FnOnce() + Send + 'static>;

pub struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    // --생략--

    pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static
    {
        let job = Box::new(f);
        self.sender.send(job).unwrap();
    }

    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));
        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender,
        }
    }
    // --생략--
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {} got a job; executing.", id);
                job.call_box();
            }
        });

        Worker {
            id,
            thread,
        }
    }
}

Graceful한 종료

  • Graceful한 종료란, 서버 종료 신호를 받았을 때 현재 처리하고 있는 요청을 모두 처리할 때 까지 기다린 후 종료하는 것을 의미한다.

  • 풀 안의 각 스레드 상에서 join을 호출하여 스레드가 종료되기 전에 그들이 처리하던 요청을 마저 처리할 수 있도록 하기 위하여 Drop 트레잇을 구현해보자. 아래와 같이 Drop에서 worker의 Thread들에 join하여 종료하길 기다리도록 하는 것이 목표이다.

    ...
    impl Drop for ThreadPool {
        fn drop(&mut self) {
            for worker in &mut self.workers {
                println!("Shutting down worker {}", worker.id);
                thread.join().unwrap();
            }
        }
    }
    ...
  • 하지만 이 코드는 아직 작동하지 않는다. worker를 가변 형태로 빌려왔으니, 인수의 소유권을 필요로 하는 join을 호출할 수 없다. 이 이슈를 해결하기 위해, join이 스레드를 사용할 수 있도록 thread의 소유권을 Worker 인스턴스로부터 빼내야 한다.

  • WorkerOption<thread::>JoinHandle<()>를 대신 갖도록 하면, Optiontake 메소드를 사용하여 Some variant에서 값을 빼내고 None으로 대체할 수 있다. 즉, 실행중인 Worker는 thread에 Some variant 를 갖게 되고, 우린 worker를 종료하고자 할때 SomeNone으로 대체하여 worker가 실행할 스레드를 없앨 수 있다.

    ...
    struct Worker {
        id: usize,
        thread: Option<thread::JoinHandle<()>>,
    }
    
    impl Worker {
        fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
            // --생략--
    
            Worker {
                id,
                thread: Some(thread),
            }
        }
    }
  • worker로 부터 thread를 빼내기 위해선 Option에서 take를 호출해야하므로 이에 맞춰 drop 함수의 내용을 조금 수정해준다. Some을 파괴하고 스레드를 얻기 위해 if let를 사용한다.

    impl Drop for ThreadPool {
        fn drop(&mut self) {
            for worker in &mut self.workers {
                println!("Shutting down worker {}", worker.id);
    
                if let Some(thread) = worker.thread.take() {
                    thread.join().unwrap();
                }
            }
        }
    }
  • Job이나 리스닝을 멈추고 무한 반복문을 탈출하라는 신호를 기다리도록 스레드를 수정한다. 두 variant를 가진 enum을 만들어 Job 대신 해당 enum을 가지도록 구현해보자.

    ...
    enum Message {
        NewJob(Job),
        Terminate,
    }
    ...
    
    pub struct ThreadPool {
        workers: Vec<Worker>,
        sender: mpsc::Sender<Message>,
    }
    
    // --생략--
    
    impl ThreadPool {
        // --생략--
    
        pub fn execute<F>(&self, f: F)
            where
                F: FnOnce() + Send + 'static
        {
            let job = Box::new(f);
    
            self.sender.send(Message::NewJob(job)).unwrap();
        }
    }
    
    // --생략--
    
    impl Worker {
        fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) ->
            Worker {
    
            let thread = thread::spawn(move ||{
                loop {
                    let message = receiver.lock().unwrap().recv().unwrap();
    
                    match message {
                        Message::NewJob(job) => {
                            println!("Worker {} got a job; executing.", id);
                            job.call_box();
                        },
                        Message::Terminate => {
                            println!("Worker {} was told to terminate.", id);
                            break;
                        },
                    }
                }
            });
    
            Worker {
                id,
                thread: Some(thread),
            }
        }
    }
    
    ...
    // 각 worker 스레드에 join 을 호출하기 전에 Message::Terminate를 전달한다.
    impl Drop for ThreadPool {
        fn drop(&mut self) {
            println!("Sending terminate message to all workers.");
    
            for _ in &mut self.workers {
                self.sender.send(Message::Terminate).unwrap();
            }
    
            println!("Shutting down all workers.");
    
            for worker in &mut self.workers {
                println!("Shutting down worker {}", worker.id);
    
                if let Some(thread) = worker.thread.take() {
                    thread.join().unwrap();
                }
            }
        }
    }