diff --git a/blog/2024-10-07-email-triggers/index.mdx b/blog/2024-10-07-email-triggers/index.mdx new file mode 100644 index 000000000..d95c800f0 --- /dev/null +++ b/blog/2024-10-07-email-triggers/index.mdx @@ -0,0 +1,496 @@ +--- +slug: smtp-server +authors: [hugocasa] +tags: ['smtp', 'email', 'rust', 'tcp', 'tls'] +description: 'In this article, we will implement a simple SMTP server in Rust for receiving emails.' +image: ./smtp_rust.png +title: Implementing an SMTP server in Rust +--- + +import DocCard from '@site/src/components/DocCard'; + +# Implementing an SMTP server in Rust + +In this article, we will implement a simple SMTP server in [Rust](https://www.rust-lang.org/) for receiving emails. +The code is a slightly modified version taken directly from the [Windmill codebase](https://github.com/windmill-labs/windmill/pull/4163), where it is used to trigger scripts and flows using emails. + +![Implementing an SMTP server in Rust](./smtp_rust.png "Implementing an SMTP server in Rust") + +## What is SMTP + +SMTP ([Simple Mail Transfer Protocol](https://www.cloudflare.com/learning/email-security/what-is-smtp/)) defines the protocol for transmitting emails. +The general flow of an email from sender to recipient is as follows: +- The sender's email client sends the email to its configured SMTP server (e.g., Apple Mail to smtp.gmail.com for a Gmail **sender's** address). +- The sender's SMTP server then checks the recipient's email domain and sends the email to the corresponding SMTP server. + +Note: The recipient's email client then retrieves the email from the recipient's SMTP server, typically using [POP or IMAP](https://support.microsoft.com/en-us/office/what-are-imap-and-pop-ca2c5799-49f9-4079-aefe-ddca85d5b1c9). + +The usual flow involves two SMTP exchanges: + +`sender_email_client \<-> sender_smtp_server \<-> recipient_smtp_server` + +In this article, we will focus on implementing the receiving part of the SMTP server, specifically receiving emails from other SMTP servers. The protocol for receiving emails from either email clients or other SMTP servers is the same; the only difference lies in the port used and the security process, which we will [cover later](#conclusion). + +A receiving SMTP server listens on a port for incoming TCP connections. Once connected, the sender's SMTP server sends commands to the receiving SMTP server. Our server needs to interpret these commands and respond appropriately. + +Once the email content is received, we can process or store it as needed. [In the context of Windmill](#what-is-windmill), once we receive an email, we check the email address and trigger the appropriate runnable (script, flow) with the email content as an argument. + +When SMTP servers communicate, they typically do so on [port 25](https://www.cloudflare.com/learning/email-security/smtp-port-25-587/), and by default, this connection is not encrypted. Therefore, we will also implement [STARTTLS](#starttls), which upgrades the connection to a secure TLS connection, ensuring that the email content is encrypted in transit. + +Note: Modern email clients (e.g., when sending an email from your computer) connect to SMTP servers on port 587 (or 465) directly over TLS. + +## What is Windmill + +For context, [Windmill](/) is an open-source workflow engine and developer platform. It's an alternative to the likes of Retool, Superblocks, n8n, Airflow, Prefect, and Temporal, designed to build comprehensive internal tools (endpoints, workflows, UIs). + +This article was written following the implementation of the [Email triggers](/docs/advanced/email_triggers) feature, which allows scripts and flows to be triggered by sending emails to a specific email address. + +
+ + +
+ +## Listening for incoming connections + +Let's use [tokio](https://tokio.rs/) to listen for TCP connections on port 2525 (in production, it should be exposed on port 25, which we will discuss later). +We handle each incoming connection in a separate task using [`tokio::spawn`](https://tokio.rs/tokio/tutorial/spawning) to avoid blocking the listener thread. + +```rust +use tokio::net::{TcpListener, TcpStream}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}; +use std::net::SocketAddr; + +async fn listen() -> anyhow::Result<()> { + let addr = SocketAddr::from(([127, 0, 0, 1], 2525)); + let listener = TcpListener::bind(addr).await?; + + tracing::info!("SMTP server listening on {}", addr); + + loop { + match listener.accept().await { + Ok((stream, _)) => { + tokio::spawn(async move { + if let Err(err) = handle_connection(stream).await { + tracing::error!("Error handling SMTP connection: {:?}", err); + }; + }); + } + Err(err) => { + tracing::error!("Error establishing SMTP connection: {:?}", err); + } + } + } +} +``` + +After establishing the connection, the server needs to send a `220 My SMTP server` response to the client. +This response indicates the server is ready to accept and process further commands from the client. +The text after the code `My SMTP server` can be customized. + +```rust +async fn handle_connection(stream: &mut TcpStream) { + let (reader, writer) = stream.split(); + let mut reader = BufReader::new(reader); + let mut writer = BufWriter::new(writer); + writer.write_all(b"220 My SMTP server\r\n").await?; + writer.flush().await?; + + // handle session +} +``` + +## Implementing the SMTP exchange loop + +Once the sender (server or client) connects, it begins sending commands. +You can find the full list of possible commands in [RFC 5321](https://datatracker.ietf.org/doc/html/rfc5321#section-4). + +A simplified flow of commands could be: + +``` +- Sender: EHLO +- Receiver: 250 OK +- Sender: MAIL FROM: \ +- Receiver: 250 OK +- Sender: RCPT TO: \ +- Receiver: 250 OK +- Sender: DATA +- Receiver: 354 End data with \\.\\ +- Sender: Subject: Test email +- Sender: Test email content +- Sender: . +- Receiver: 250 OK +- Sender: QUIT +- Receiver: 221 Bye +``` + +The sequence of commands is straightforward: the client begins with a greeting, specifies the sender and recipient, and then transmits the email content. +To transmit the email content, the client issues the DATA command. Everything sent after this command is considered part of the email content until a single dot `.` on a line by itself signals the end of the email. +Upon receiving the complete email, the receiver responds with a 250 OK if the email was successfully received. +Finally, the client sends the QUIT command to terminate the connection. + +The following code implements the SMTP exchange. +It processes commands one by one, with commands separated by a line break `\r\n`. +At the start, the state is `SmtpState::Command`, we then process the command and depending on it, we store the received data if any, and update the state. +We have three states: +- `SmtpState::Command`: The state after the connection is established and the client is waiting for a command. +- `SmtpState::Data`: The state after the DATA command has been received and the email content is being accumulated. +- `SmtpState::Quit`: The state after the QUIT command has been received, the connection is closed. + +We use `Framed` with `LinesCodec` from the [tokio-util](https://crates.io/crates/tokio-util) crate to split the stream by lines. +We use regular expressions to parse the sender and receiver addresses. + +For each command, we check if it is valid and respond accordingly. +If the command is valid, we send a response to the client with the appropriate code from [RFC 5321](https://datatracker.ietf.org/doc/html/rfc5321#section-4.2.2). +The message following the code is customizable. +If the command results in an error, such as an invalid command or attempting to send an email before specifying the recipient, we send the corresponding error code and message to the client. + +```rust +use tokio_util::codec::{Framed, LinesCodec, LinesCodecError}; +use futures::{stream::iter, SinkExt, StreamExt}; + +async fn handle_session(mut stream: TcpStream) -> anyhow::Result<()> { + let RE_SMTP_MAIL = regex::Regex::new(r"(?i)from: ?<(.+)>").unwrap(); + let RE_SMTP_RCPT = regex::Regex::new(r"(?i)to: ?<(.+)>").unwrap(); + let mut message = String::new(); + let mut state = SmtpState::Command; + let mut mailfrom: Option = None; + let mut rcpts: Vec = Vec::new(); + let mut framed = Framed::new(stream, LinesCodec::new()); + while let Some(line_str) = framed.next().await { + let line = line_str?; + match state { + SmtpState::Command => { + let space_pos = line.find(" ").unwrap_or(line.len()); + let (command, arg) = line.split_at(space_pos); + let arg = arg.trim(); + match &*command.trim().to_uppercase() { + "HELO" | "EHLO" => { + send_commands(&mut framed, vec!["250 Hello".to_string()]).await?; + } + "MAIL" => { + // Handle MAIL FROM command + if let Some(address) = RE_SMTP_MAIL.captures(arg).and_then(|cap| cap.get(1)) { + mailfrom = Some(address.as_str().to_string()); + send_commands(&mut framed, vec!["250 OK".to_string()]).await?; + } else { + send_commands(&mut framed, vec!["501 Syntax: MAIL From:
".to_string()]).await?; + } + } + "RCPT" => { + // Handle RCPT TO command + if mailfrom.is_none() { + send_commands(&mut framed, vec!["503 Error: Send MAIL first".to_string()]).await? ; + } else { + if let Some(address) = RE_SMTP_RCPT.captures(arg).and_then(|cap| cap.get(1)) { + rcpts.push(address.as_str().to_string()); + send_commands(&mut framed, vec!["250 OK".to_string()]).await?; + } else { + send_commands(&mut framed, vec!["501 Syntax: RCPT TO:
".to_string()]).await?; + } + } + } + "DATA" => { + if rcpts.is_empty() { + send_commands(&mut framed, vec!["503 Error: MAIL FROM and RCPT TO must be set before sending DATA".to_string()]).await?; + } else { + state = SmtpState::Data; + send_commands(&mut framed, vec!["354 End data with .".to_string()]).await?; + } + } + "NOOP" => { + send_commands(&mut framed, vec!["250 OK".to_string()]).await? ; + } + "RSET" => { + mailfrom = None; + rcpts = Vec::new(); + message = String::new(); + send_commands(&mut framed, vec!["250 OK".to_string()]).await?; + } + "QUIT" => { + send_commands(&mut framed, vec!["221 Bye".to_string()]).await?; + state = SmtpState::Quit; + } + _ => { + send_commands(&mut framed, vec!["500 Unknown command".to_string()]).await?; + } + } + } + SmtpState::Data => { + if line.trim() == "." { + // The end of the email content has been received + send_commands(&mut framed, vec!["250 OK".to_string()]).await?; + // reset the state and variables for the next email + mailfrom = None; + rcpts = Vec::new(); + message = String::new(); + state = SmtpState::Command; + // we can now handle the email: + handle_email(mailfrom, rcpts, message); + } else { + // Add the received line to the email content + message.push_str(&line); + message.push_str("\n"); + } + } + SmtpState::Quit => { + break; + } + } + } +} +``` + +The `send_commands` function is a helper to send a line of text to the client. + +```rust +async fn send_commands( + framed: &mut Framed, LinesCodec>, + commands: Vec, +) -> anyhow::Result<()> { + // only need to add \r because the codec only adds \n + let messages = iter(commands.into_iter().map(|x| format!("{}\r", x))); + framed.send_all(&mut messages.map(Ok)).await?; + Ok(()) +} +``` + +## STARTTLS + +As explained earlier, to secure the communication between SMTP servers, we should use STARTTLS ([RFC 3207](https://www.ietf.org/rfc/rfc3207.txt)). +This protocol allows an existing insecure connection to be upgraded to a [TLS connection](https://www.cloudflare.com/learning/ssl/transport-layer-security-tls/). + +The flow begins the same way as before but the server specifies that it supports TLS by sending the `250-STARTTLS` response to the `EHLO` command. +The client, recognizing that the server supports STARTTLS, sends the `STARTTLS` command. +The server responds with `220 GO ON` and then both the client and server perform the TLS handshake. +Once the handshake is complete, the usual SMTP exchange can take place on this TLS encrypted connection. +As we only want to receive emails when the connection is encrypted, we will not accept any of the `MAIL`, `RCPT` or `DATA` commands before the handshake is complete. +Once complete, we switch to the loop from `handle_session` describe above. + +```rust +async fn handle_unsecured_session( + reader: &mut BufReader, + writer: &mut BufWriter, +) -> anyhow::Result<()> { + let mut is_tls = false; + let mut line = String::new(); + while reader.read_line(&mut line).await? != 0 { + let space_pos = line.find(" ").unwrap_or(line.len()); + let (command, _) = line.split_at(space_pos); + + match command.trim().to_uppercase().as_ref() { + "EHLO" | "HELO" => { + writer.write_all(b"250-windmill Hello\r\n").await?; + writer.write_all(b"250-STARTTLS\r\n").await?; + writer.write_all(b"250 What you've got?\r\n").await?; + writer.flush().await?; + } + "STARTTLS" => { + writer.write_all(b"220 GO ON\r\n").await?; + writer.flush().await?; + is_tls = true; + break; + } + "QUIT" => { + writer.write_all(b"221 Have a nice day!\r\n").await?; + writer.flush().await?; + break; + } + "NOOP" => { + writer.write_all(b"250 OK\r\n").await?; + writer.flush().await?; + } + "MAIL" | "RCPT" | "DATA" | "RSET" => { + writer + .write_all(b"530 Must issue a STARTTLS command first\r\n") + .await?; + writer.flush().await?; + } + _ => { + writer.write_all(b"500 Unknown command\r\n").await?; + writer.flush().await?; + } + } + + line.clear(); + } + + if is_tls { + handle_starttls(stream).await?; + } +} +``` + +The `handle_starttls` function manages the TLS handshake. We use the [native-tls](https://crates.io/crates/native-tls) crate along with [tokio-native-tls](https://crates.io/crates/tokio-native-tls) for this purpose. +You will need a certificate for the handshake; depending on your use case, you can either use a self-signed certificate or one from a trusted provider. + +```rust +use native_tls::{Identity, TlsAcceptor}; +use tokio_native_tls::{TlsAcceptor as TokioTlsAcceptor, TlsStream}; + +async fn handle_starttls( + stream: &mut TcpStream, +) -> anyhow::Result<()> { + // ideally the certificate should only be loaded from here and not generated each time + let (pem_certificate, pem_private_key) = generate_certificate()?; + let identity = Identity::from_pkcs8(&pem_certificate, &pem_private_key)?; + let tls_acceptor = TlsAcceptor::builder(identity).build()?; + let tls_acceptor = TokioTlsAcceptor::from(tls_acceptor); + + match tls_acceptor.accept(stream).await { + Ok(stream) => { + // we can now handle the normal SMTP session + handle_session(stream).await?; + } + Err(e) => { + tracing::error!("Error establishing SMTP TLS connection: {:?}", e); + } + }; +} +``` + +Here's the code to generate a self-signed certificate using the [openssl](https://crates.io/crates/openssl) crate: + +```rust +use openssl::{ + asn1::Asn1Time, + pkey::PKey, + rsa::Rsa, + x509::{ + extension::{AuthorityKeyIdentifier, BasicConstraints, SubjectKeyIdentifier}, + X509NameBuilder, X509, + }, +}; +fn generate_certificate() -> anyhow::Result<(String, String)> { + let cert_result = { + let rsa = Rsa::generate(4096)?; + let pkey = PKey::from_rsa(rsa)?; + let mut name = X509NameBuilder::new()?; + name.append_entry_by_text("CN", "localhost")?; + let name = name.build(); + let mut builder = X509::builder()?; + builder.set_version(2)?; + builder.set_subject_name(&name)?; + builder.set_issuer_name(&name)?; + builder.set_pubkey(&pkey)?; + let now = Asn1Time::days_from_now(0)?; + let later = Asn1Time::days_from_now(3650)?; + builder.set_not_before(now.as_ref())?; + builder.set_not_after(later.as_ref())?; + builder.append_extension(BasicConstraints::new().critical().ca().build()?)?; + builder.append_extension(SubjectKeyIdentifier::new().build(&builder.x509v3_context(None, None))?)?; + builder.append_extension(AuthorityKeyIdentifier::new().keyid(true).issuer(true).build(&builder.x509v3_context(None, None))?)?; + builder.sign(&pkey, openssl::hash::MessageDigest::sha256())?; + let c = builder.build(); + Ok((c.to_pem()?, pkey.private_key_to_pem_pkcs8()?)) + } + let (pem_certificate, pem_private_key) = cert_result + .as_ref() + .map_err(|e| anyhow::anyhow!("Could not generate self-signed certificates: {}", e))?; + + Ok((pem_certificate, pem_private_key)) +} +``` + + +## Testing + +You can test that your server works by using [`nc`](https://linuxize.com/post/netcat-nc-command-with-examples/): + +```bash +nc localhost 2525 +``` + +You should see the `220 My SMTP server` response. +You can then send commands to your server and observe the responses. +Note that you cannot upgrade to TLS from `nc`, so sending the `STARTTLS` command via `nc` won't work. + +However, you can test the TLS handshake and send emails using `openssl`: + +```bash +openssl s_client -starttls smtp -connect localhost:2525 +``` + +This command connects to the server, sends `STARTTLS`, and upgrades the connection to TLS. +You can then send commands to the server and send emails. + +Although I haven't personally used it, [swaks](https://www.jetmore.org/john/code/swaks/) is a popular tool to test SMTP servers. + +The command line is useful for testing, but it’s often more practical to use a Python script. +Here's a simple example using the [`smtplib`](https://docs.python.org/3/library/smtplib.html) library: + +```python +smtp_server = "localhost" +smtp_port = 2525 +from_email = "you@example.com" +to_email = "to@example.com" +subject = "Test Email" +body = "This is a test email sent to my local SMTP server on port 2525." +html_body = "This is a HTML" + +# Create the email +message = MIMEMultipart() +message["From"] = from_email +message["To"] = to_email +message["Subject"] = subject + +message.attach(MIMEText(body, "plain")) +message.attach(MIMEText(html_body, "html")) + +# to add an attachment: +# file_path = "myfile.txt" +# with open(file_path, "rb") as attachment: +# part = MIMEBase("application", "octet-stream") +# part.set_payload(attachment.read()) +# encoders.encode_base64(part) +# part.add_header( +# "Content-Disposition", +# f"attachment; filename= {file_path}", +# ) +# message.attach(part) + +# Send the email +try: + with smtplib.SMTP(smtp_server, smtp_port) as server: + server.starttls() + server.sendmail(from_email, to_email, message.as_string()) + print("Email sent successfully") +except Exception as e: + print(f"Failed to send email: {e}") +``` + +## Deploying + +You need to expose the server externally on port 25. You can use a reverse proxy to redirect incoming SMTP connections to your server running on port 2525. +For instance, we use [Caddy](https://caddyserver.com/) with a [Layer 4 extension](https://github.com/mholt/caddy-l4) to support TCP proxying. Our Dockerfile is available [here](https://github.com/windmill-labs/windmill/blob/main/docker/DockerfileCaddyL4), the image [here](https://github.com/windmill-labs/windmill/pkgs/container/caddy-l4) and the Caddyfile [here](https://github.com/windmill-labs/windmill/blob/main/Caddyfile). + +You will also need to configure your DNS settings to point to your server. + +Your DNS configuration requires two entries: +- An `A record` that points to your server's IP address (e.g. smtp.yourdomain.com) +- An `MX record` from your domain (e.g. yourdomain.com in myaddress@yourdomain.com) to your server A record (e.g. smtp.yourdomain.com). + +These settings will inform other SMTP servers where to send emails for your domain. + +You can now test the server by sending an email from your personal email account to an address with the domain you configured in the DNS settings. Afterward, verify that your SMTP server successfully receives the email. + +## Conclusion + +We've implemented a simple SMTP server in Rust that can securely receive emails over TLS from other SMTP servers. +This setup provides the core functionality for receiving emails, allowing you to handle them as needed. You can store the emails, access them via your own API, or implement any custom logic based on your specific requirements. +You can also extend this setup to complete the SMTP server and enable it to send emails. For receiving emails directly from clients, you can reuse the same logic, simply adjusting the ports (e.g., port 587 or 465) and ensuring a secure connection from the start. +To send emails, you'll need to implement the logic for initiating SMTP commands to other servers, which follows a similar process to receiving but in reverse, with your server acting as the sender. + +If you're interested in triggering scripts using email, check out [Windmill](https://windmill.dev). +Windmill is an open-source platform and workflow engine for developers to execute jobs. +It supports on-demand execution via an auto-generated UI, API calls, scheduling, event triggers (like email), and more. +We use code very similar to the one above to trigger Windmill scripts or flows using email. + +Thank you for reading! If you have any feedback or questions, feel free to contact us on our [Discord](https://discord.com/invite/V7PM2YHsPB). \ No newline at end of file diff --git a/blog/2024-10-07-email-triggers/smtp_rust.png b/blog/2024-10-07-email-triggers/smtp_rust.png new file mode 100644 index 000000000..e7db3f3c3 Binary files /dev/null and b/blog/2024-10-07-email-triggers/smtp_rust.png differ diff --git a/blog/authors.yml b/blog/authors.yml index e3e3794a3..f5cca046e 100644 --- a/blog/authors.yml +++ b/blog/authors.yml @@ -58,7 +58,7 @@ zegoverno: image_url: https://media.licdn.com/dms/image/C4D03AQE2yXYlSoBQmg/profile-displayphoto-shrink_800_800/0/1594600574101?e=2147483647&v=beta&t=c2VZb77tUdnBRzlGVTuY23qDvGlUHy1xqy_zn1TNr40 hugocasa: name: Hugo Casademont - title: LLM Research Engineer + title: Founding Engineer url: https://github.com/hugocasa image_url: https://github.com/hugocasa.png sindresvendby: @@ -79,4 +79,4 @@ romaricphilogene: edwindmiller: name: Edwind Miller title: Contributor from the Windmill Community - image_url: '/team/edwind_miller.jpg' \ No newline at end of file + image_url: '/team/edwind_miller.jpg'