Создание простого RESTful API для блога с помощью Flask и sqlite3
Написание API в первый раз может оказаться сложной задачей. К счастью, у нас есть очень простые в использовании инструменты, такие как Flask и sqlite3, которые упрощают этот процесс. Как и составление списка дел, API для блог-приложения может укрепить основы создания API.
Что такое Flask?
Flask – это модуль Python, который позволяет легко разрабатывать веб-приложения – веб-фреймворк. Веб-фреймворк – это инструмент, который позволяет разработчикам создавать веб-приложения, не заботясь о низкоуровневых деталях веб-сервера.
Что такое sqlite3?
sqlite – это реляционная система управления с открытым исходным кодом, не требующая серверной настройки. Это делает ее простой в настройке и использовании.
Установка
В рамках данного руководства я не буду рассматривать процесс установки python. Я также предположу, что вы уже установили IDE по своему вкусу.
Если вам нужна помощь в установке python на вашу машину, пожалуйста, обратитесь к следующим ссылкам: Apple, Windows
Скачать Visual Studio Code можно здесь.
Проверьте версию python с помощью следующей команды, я буду использовать python3:
~ % python3 -V
Python 3.11.4
Настройка
Виртуальная среда – это автономное рабочее пространство внутри машины, которое позволяет разработчикам управлять зависимостями и установкой пакетов независимо от общесистемной среды. Считайте, что это “пузырь” вокруг данного приложения, поэтому все, что в нем содержится, предназначено для использования в данном конкретном приложении. Модуль ‘venv’ является частью установки python, начиная с python3.3.
Давайте создадим папку, в которую поместим наш проект, и создадим виртуальное окружение в этой директории:
% mkdir blog-tut #make a folder called 'blog-tut'
% cd blog-tut #change directory into 'blog-tut'
% python3 -m venv . #create a virtual environment within the folder
# the "." in the previous statement references the curent folder
Теперь, когда мы инициализировали виртуальную среду, у нас должно появиться несколько новых папок:
% ls
bin include lib pyvenv.cfg
Теперь нам нужно “активировать” это окружение. В каталоге ‘bin’ есть скрипт ‘activate’, который мы можем запустить с помощью:
% . bin/activate
(blog-tut) %
После активации окружения вы увидите его название в командной строке. Давайте установим Flask в наше окружение, прежде чем двигаться дальше. Для этого вам понадобится pip, менеджер пакетов python (подробнее о нем вы можете прочитать здесь). Поскольку мы используем виртуальное окружение, у нас уже должен быть установлен pip.
(blog-tut) % pip3 install flask
Наконец, мы будем использовать Postman для тестирования нашего API. Вы можете установить Postman здесь. Подробнее об этом позже.
Создание API
Теперь, когда мы все установили, мы готовы написать немного кода! Откройте папку в вашей IDE – я буду использовать VSCode. Создайте новый файл в той же папке, назовите его “main.py” и откройте его.
Прежде чем мы сможем использовать любые модули, которые мы установили, нам нужно будет их импортировать. В данном случае это касается flask и sqlite3. В следующем коде мы импортируем flask и sqlite3, а также убедимся, что наш веб-сервер запущен.
import sqlite3
import json
from flask import Flask, request, Response
app = Flask(__name__)
@app.get("/hello")
def hello_world():
return 'Hello World!'
app.run(host='localhost', port=5040)
В приведенном выше коде мы:
- импортируем модули sqlite3 и flask
- создаем экземпляр веб-приложения Flask, name здесь ссылается на текущий модуль python (это тема для другого раза)
- напишем конечную точку HTTP GET, определяющую функцию, которая возвращает “Hello World!”.
- запустите наше приложение flask на порту 5040 – “localhost” в более простых терминах означает “этот компьютер”.
Теперь запустите приложение и зайдите на сайт http://localhost:5040/hello в вашем браузере.
(blog-tut) blog-tut % python3 main.py
И вы увидите следующее:
Теперь давайте создадим шаблон для остальной части нашего приложения и пройдемся по каждому методу и заполним .
app = Flask(__name__)
@app.get("/hello")
def hello_world():
return 'Hello World!'
#post a blog
@app.post("/postBlog")
def post_blog():
return
def post_blog_handler(userId, blogTitle, blogContent, authorName, date):
return
#get all blogs
@app.get("/blogs")
def get_blogs():
return
def get_blogs_handler():
return
#create a database
@app.post("/createTable")
def create_database():
return
def create_database_handler(tableName, fields):
return
app.run(host='localhost', port=5040)
Сначала разберемся с обработчиком create_database_handler. Мы не будем слишком глубоко погружаться в SQL-команды, но они очень просты. Мы хотим создать базу данных, в которой будут храниться наши данные. Мы сделаем это с помощью модуля sqlite3.
def create_database_handler(tableName, fields):
cnx = sqlite3.connect("blog.db") #connect to the database file
cur = cnx.cursor() # create a cursor to perform operations on the database conneciton
try:
fields = ', '.join('"' + item + '"' for item in fields) #surround each item in the fields in quotes and separate them using commas
query = "CREATE TABLE " + tableName + "(" + fields + ")" # construct the query
cur.execute(query) #execute the query
return True
except Exception as e: #if an error occured, the code will go here
print("Error occured: ", e) # print the error
return False #return false
В приведенном выше коде мы:
- создали соединение с файлом нашей базы данных
- создали курсор, который позволит нам выполнять операции с нашим соединением
- внутри блока try/except мы сначала разделим имена полей в таблице базы данных запятыми и заключим их в кавычки, а затем создадим SQL-запрос – например, если наша переменная “fields” представляет собой список со значениями [field1, field2, field3], мы сначала создадим строку в виде: “field1”, “field2”, “field3”, а затем добавить ее и имя таблицы в круглые скобки в запросе, чтобы получилось ‘CREATE TABLE tablename(“field1”, “field2”, “field3”)’.
Чтобы запустить этот метод с помощью конечной точки API ‘/createTable’, сделаем следующее:
@app.post("/createTable")
def create_database():
data = request.json
db_create_success = create_database_handler(data["db_name"], data["db_fields"])
if db_create_success:
return Response({'success: True'}, status=200)
return Response({'success: False'}, status=500)
Здесь происходят некоторые закулисные вещи. Если вернуться к нашему утверждению об импорте, то мы импортировали “request” – “from flask import Flask, request, Response”. Это позволяет нам получить доступ к данным в запросе, который мы получили на нашу конечную точку “/createTable”. Мы можем протестировать нашу конечную точку с помощью Postman. Запустите свой сервер, используя команду python из предыдущего раздела – вам нужно будет перезапустить его на этом этапе, чтобы код обновился – нажмите cmd+c, чтобы завершить процесс.
Postman
Создайте новую вкладку:
Щелкните на значке выпадающего списка и выберите ‘POST’ (это соответствует нашей конечной точке, принимающей POST-запросы (@app.post)). Затем введите нашу ссылку на localhost, за которой следует “/createTable”.
Теперь мы хотим отправить нашей конечной точке для разбора тип тела JSON. Перейдите на вкладку “Body” и выберите “raw”, а в выпадающем списке справа выберите JSON.
JSON – это тип данных, который мы можем использовать для отправки информации. Введите следующее в текстовое поле и отправьте.
{
"db_name":"blogs",
"db_fields":["userId", "blogTitle", "blogContent", "authorName", "date"]
}
Итак, что мы здесь сделали? Мы создали запрос “POST”, тот же тип запроса, который принимает наша конечная точка /createTable, создали объект JSON, присвоили “blogs” имя db_name и список полей “db_fields” и отправили запрос на нашу конечную точку. Если все прошло успешно, вы должны увидеть следующее в нижней части Postman:
Обратите внимание на две вещи: сообщение об успехе и ‘200 OK’ в правом верхнем углу. Обратимся к нашему коду, а именно к возвращаемому методу create_database:
return Response({'success: True'}, status=200)
Мы видим, что в случае успеха мы вернем ‘success: True” и статус 200. После этого в каталоге проекта должен появиться файл с названием ‘blog.db’.
Создание новой записи в блоге
Теперь мы завершим работу с методами ‘post_blog’ и ‘post_blog_handler’.
#make new blog post
@app.post("/postBlog")
def post_blog():
data = request.json
post_success = post_blog_handler(data["userId"], data["blogTitle"],
data["blogContent"], data["authorName"],
data["date"])
if post_success:
return Response({'success: true'}, status = 200)
return Response({'success: false'}, status = 500)
Здесь мы используем метод ‘request’ из flask, чтобы получить запрос, который будет отправлен на эту конечную точку, и преобразуем его в JSON. Затем мы используем информацию, содержащуюся в этом JSON-объекте, для вызова нашего метода-обработчика. Так как метод обработчика возвращает True или False, если post_success: будет оценен как true, если метод обработчика вернет true, в противном случае он будет false.
def post_blog_handler(userId, blogTitle, blogContent, authorName, date):
blog_info = [userId, blogTitle, blogContent, authorName, date]
fields = ', '.join('"' + item + '"' for item in blog_info)
query = "INSERT INTO blogs VALUES(" + fields + ")"
cnx = sqlite3.connect("blog.db")
cur = cnx.cursor()
try:
cur.execute(query)
cnx.commit()
return True
except Exception as e:
print("Error occured ", e)
return False
В методе обработчика сначала мы берем все предоставленные аргументы и вставляем их в список, заключая в кавычки и разделяя запятыми. Затем мы составляем SQL-запрос, подключаемся к базе данных и выполняем команду. Обратите внимание, что нам нужно зафиксировать изменения, cnx.commit(), это обновит нашу базу данных с изменениями.
Давайте вернемся в Postman и протестируем это.
Выполните те же действия, что и ранее в Postman, не забыв обновить конечную точку в ссылке на “/postBlog”, а также данные JSON:
В ответ вы снова увидите сообщение об успехе:
Теперь, когда мы успешно создали пост, нам нужен способ его получить. Мы можем сделать это с помощью конечной точки GET.
@app.get("/blogs")
def get_blogs():
blog_success = get_blogs_handler()
if blog_success[0]:
return Response(json.dumps(blog_success[1]), status = 200)
return Response({"success: false"}, status = 500)
def get_blogs_handler():
cnx = sqlite3.connect("blog.db")
cur = cnx.cursor()
try:
query = "SELECT * FROM blogs"
blogs = cur.execute(query)
all_blogs = blogs.fetchall()
return [True, all_blogs]
except Exception as e:
print("Error occurred ", e)
return [False, e]
Обратите внимание, что декоратор (оператор @ над определением функции) здесь другой – ‘@app.get’.
Метод-обработчик здесь выполняет почти все те же операции, что и раньше, с некоторыми отличиями. Во-первых, мы используем SELECT и ‘*’, чтобы получить все записи блога в нашей базе данных. Затем мы выполняем запрос и используем ‘fetchall()’, чтобы получить все записи в нашей базе данных. Наш оператор возврата здесь отличается, потому что мы должны не только вернуть ‘True’ или ‘False’, чтобы указать, успешно ли выполнился метод, но и вернуть записи в блогах, запрошенные из базы данных.
Как только наш обработчик выполнится и вернет результат, мы проверим, “если blog_success” имеет значение True или False на индексе 0 списка, который является первым индексом ([0,1]). Если это значение равно true, мы берем результат, полученный из базы данных по индексу 1, blog_success[1], и преобразуем его в объект JSON с помощью метода json.dumps, возвращая его и статус 200. Мы можем импортировать модуль json с помощью следующей команды, которая также включена в предыдущие операторы импорта.
import json
Вернувшись в Postman, отправьте GET-запрос на конечную точку ‘/blogs’.
И мы должны увидеть следующее:
Теперь у вас есть работающий API!
Вот полный исходный код с добавленными функциями удаления, обновления и получения записей блога с помощью методов автора.
import sqlite3
import json
from flask import Flask, request, Response
app = Flask(__name__)
#make new blog post
@app.post("/postBlog")
def post_blog():
data = request.json
post_success = post_blog_handler(data["userId"], data["blogTitle"],
data["blogContent"], data["authorName"],
data["date"])
if post_success:
return Response({'success: true'}, status = 200)
return Response({'success: false'}, status = 500)
def post_blog_handler(userId, blogTitle, blogContent, authorName, date):
blog_info = [userId, blogTitle, blogContent, authorName, date]
fields = ', '.join('"' + item + '"' for item in blog_info)
query = "INSERT INTO blogs VALUES(" + fields + ")"
cnx = sqlite3.connect("blog.db")
cur = cnx.cursor()
try:
cur.execute(query)
cnx.commit()
return True
except Exception as e:
print("Error occured ", e)
return False
#delete blog post
@app.route("/delete/<id>")
def delete_post(id):
delete_success = delete_post_handler(id)
if delete_success:
return Response({'success: true'}, status = 200)
return Response({'success: false'}, status = 500)
def delete_post_handler(id):
cnx = sqlite3.connect("blog.db")
cur = cnx.cursor()
try:
query = "DELETE FROM blogs WHERE id = '" + id + "'"
cur.execute(query)
return True
except Exception as e:
print("Error occurred", e)
return False
@app.route("/update", methods=['PATCH'])
def update_blog():
data = request.json
update_success = update_blog_handler(data['title'], data['content'])
if update_success:
return Response({"success: true"}, status = 200)
return Response({"success: false"}, status = 500)
def update_blog_handler(title, newContent):
cnx = sqlite3.connect("blog.db")
cur = cnx.cursor()
try:
query = "UPDATE blogs SET blogContent = '" + newContent + "' WHERE blogTitle = '" + title + "'"
cur.execute(query)
cnx.commit()
return True
except Exception as e:
print("Error occurred ", e)
return False
@app.get("/blogs")
def get_blogs():
blog_success = get_blogs_handler()
if blog_success[0]:
return Response(json.dumps(blog_success[1]), status = 200)
return Response({"success: false"}, status = 500)
def get_blogs_handler():
cnx = sqlite3.connect("blog.db")
cur = cnx.cursor()
try:
query = "SELECT * FROM blogs"
blogs = cur.execute(query)
all_blogs = blogs.fetchall()
return [True, all_blogs]
except Exception as e:
print("Error occurred ", e)
return [False, e]
@app.get("/author")
def get_blogs():
print(request.args.get("author"))
if request.args["author"]:
blog_ret = get_blogs_by_author_handler(request.args.get("author"))
print(blog_ret)
if blog_ret[0]:
return Response(json.dumps(blog_ret[1]), status = 200)
return Response({"success: false"}, status = 500)
def get_blogs_by_author_handler(author):
cnx = sqlite3.connect("blog.db")
cur = cnx.cursor()
try:
query = "SELECT * FROM blogs WHERE authorName = '" + author + "'"
blogs = cur.execute(query)
all_blogs = blogs.fetchall()
return [True, all_blogs]
except Exception as e:
print("Error occurred", e)
return [False, e]
@app.post("/createTable")
def create_database():
data = request.json
db_create_success = create_database_handler(data["db_name"], data["db_fields"])
if db_create_success:
c
return Response({'success: False'}, status=500)
def create_database_handler(tableName, fields):
cnx = sqlite3.connect("blog.db")
cur = cnx.cursor()
try:
fields = ', '.join('"' + item + '"' for item in fields)
print(fields)
query = "CREATE TABLE " + tableName + "(" + fields + ")"
print(query)
cur.execute(query)
return True
except Exception as e:
print("Error occured: ", e)
return False
app.run(host='0.0.0.0', port=5040)