Feed 分页查询¶
1. 概述¶
FeedApi 提供用户动态(Feed)的分页查询能力。通过它可以获取指定用户发布的媒体列表,支持手动分页(逐页获取,自行控制游标)和自动全量分页(一次调用获取全部内容)两种模式。
FeedApi 同时支持 Client(Android 平台)和 WebClient(Web 平台),行为完全一致。
注意:查询私密账号的 Feed 需要已登录且与对方有关注关系,否则会抛出
PermissionError。
2. 快速开始¶
获取用户最新一页动态¶
import asyncio
from igapi import AccountInfo, Client
account = AccountInfo.parse("用户名:密码||设备信息|cookies||")
client = Client(account=account)
feed_api = client.feed()
user_id = 123456789
async def main():
# 获取第一页(最多约 12 条)
items, next_cursor = await feed_api.user(user_id)
for media in items:
print(media.id, media.caption_text)
print("下一页游标:", next_cursor) # None 表示已是最后一页
asyncio.run(main())
自动获取全部动态¶
import asyncio
from igapi import AccountInfo, Client
account = AccountInfo.parse("用户名:密码||设备信息|cookies||")
client = Client(account=account)
feed_api = client.feed()
user_id = 123456789
async def main():
# 自动分页,默认最多获取 100 条
all_items = await feed_api.user_all(user_id)
print(f"共获取 {len(all_items)} 条动态")
asyncio.run(main())
3. API 参考¶
3.1 feed_api.user()¶
获取用户 Feed 的单页数据,需要手动传递游标以翻页。
函数签名
参数
| 参数 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
user_id |
int |
是 | — | 目标用户的数字 ID |
max_id |
str \| None |
否 | None |
分页游标。None 表示从第一页开始 |
返回值
返回一个二元组 (items, next_cursor):
| 字段 | 类型 | 说明 |
|---|---|---|
items |
list[Media] |
当前页的媒体列表 |
next_cursor |
str \| None |
下一页的游标。None 表示当前已是最后一页 |
异常
| 异常类型 | 触发条件 |
|---|---|
PermissionError |
未登录,或目标账号为私密账号且未互相关注 |
KeyError |
用户 ID 不存在 |
RuntimeError |
速率限制触发 |
示例
import asyncio
from igapi import AccountInfo, Client
account = AccountInfo.parse("用户名:密码||设备信息|cookies||")
client = Client(account=account)
feed_api = client.feed()
async def main():
# 获取第一页
items, cursor = await feed_api.user(123456789)
# 用游标获取第二页
if cursor:
items2, cursor2 = await feed_api.user(123456789, max_id=cursor)
asyncio.run(main())
3.2 feed_api.user_all()¶
自动翻页,一次调用获取用户全部动态(或指定数量上限)。内部循环调用 user(),无需手动管理游标。
函数签名
参数
| 参数 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
user_id |
int |
是 | — | 目标用户的数字 ID |
limit |
int |
否 | 100 |
最大获取条数。0 表示不限制数量(见下方说明) |
limit 参数说明
limit=100(默认):最多返回 100 条,内部自动翻页直到达到上限或无更多内容limit=0:不限制返回数量,持续翻页直到获取全部内容,内部最多执行 500 次分页请求(约 6000 条),超出后抛出RuntimeErrorlimit=N:最多返回前 N 条媒体
返回值
| 类型 | 说明 |
|---|---|
list[Media] |
按时间倒序排列的媒体列表(最新内容在最前) |
异常
| 异常类型 | 触发条件 |
|---|---|
PermissionError |
未登录,或目标账号为私密账号且未互相关注 |
KeyError |
用户 ID 不存在 |
RuntimeError |
分页次数超过 500 次上限,或速率限制触发 |
示例
import asyncio
from igapi import AccountInfo, Client
account = AccountInfo.parse("用户名:密码||设备信息|cookies||")
client = Client(account=account)
feed_api = client.feed()
async def main():
# 获取最新 50 条
items = await feed_api.user_all(123456789, limit=50)
# 获取全部内容(不设上限)
all_items = await feed_api.user_all(123456789, limit=0)
print(f"用户共有 {len(all_items)} 条动态")
asyncio.run(main())
4. 分页机制图解¶
手动分页的工作流程如下:
flowchart TD
A([开始]) --> B["调用 user(user_id, max_id=None)"]
B --> C{返回 next_cursor?}
C -- "next_cursor = None" --> D([结束:已是最后一页])
C -- "next_cursor = '游标字符串'" --> E[处理当前页 items]
E --> F["调用 user(user_id, max_id=next_cursor)"]
F --> C
user_all() 内部封装了上述循环,无需手动实现。
5. 代码示例¶
5.1 手动分页遍历全部动态¶
import asyncio
from igapi import AccountInfo, Client
account = AccountInfo.parse("用户名:密码||设备信息|cookies||")
client = Client(account=account)
feed_api = client.feed()
user_id = 123456789
async def main():
cursor = None
all_items = []
page = 1
while True:
items, cursor = await feed_api.user(user_id, max_id=cursor)
all_items.extend(items)
print(f"第 {page} 页:获取到 {len(items)} 条,累计 {len(all_items)} 条")
page += 1
if cursor is None:
break # 没有更多内容
print(f"遍历完毕,共 {len(all_items)} 条动态")
asyncio.run(main())
5.2 使用 user_all() 自动分页¶
import asyncio
from igapi import AccountInfo, Client
account = AccountInfo.parse("用户名:密码||设备信息|cookies||")
client = Client(account=account)
feed_api = client.feed()
async def main():
# 获取全部,最多 500 次翻页
all_items = await feed_api.user_all(123456789, limit=0)
for media in all_items:
print(f"[{media.media_type}] {media.id} - 点赞: {media.like_count}")
asyncio.run(main())
5.3 只获取最近 N 条¶
import asyncio
from igapi import AccountInfo, Client
account = AccountInfo.parse("用户名:密码||设备信息|cookies||")
client = Client(account=account)
feed_api = client.feed()
async def main():
# 只需要最近 30 条
recent = await feed_api.user_all(123456789, limit=30)
print(f"最近 {len(recent)} 条动态:")
for m in recent:
print(f" {m.id} 赞: {m.like_count} 评论: {m.comment_count}")
asyncio.run(main())
5.4 断点续传(保存游标到文件)¶
适合长时间运行任务,中途中断后可从上次位置继续:
import asyncio
import json
import os
from igapi import AccountInfo, Client
CURSOR_FILE = "feed_cursor.json"
account = AccountInfo.parse("用户名:密码||设备信息|cookies||")
client = Client(account=account)
feed_api = client.feed()
user_id = 123456789
async def main():
# 读取上次游标
cursor = None
if os.path.exists(CURSOR_FILE):
with open(CURSOR_FILE, "r") as f:
data = json.load(f)
cursor = data.get("cursor")
print(f"从断点继续,游标: {cursor}")
else:
print("从头开始获取")
collected = []
try:
while True:
items, next_cursor = await feed_api.user(user_id, max_id=cursor)
collected.extend(items)
# 保存当前游标
with open(CURSOR_FILE, "w") as f:
json.dump({"cursor": next_cursor}, f)
cursor = next_cursor
print(f"已获取 {len(collected)} 条...")
if cursor is None:
break
except KeyboardInterrupt:
print(f"\n中断,已保存游标到 {CURSOR_FILE}")
print(f"本次获取 {len(collected)} 条")
# 完成后清理游标文件
if os.path.exists(CURSOR_FILE):
os.remove(CURSOR_FILE)
print(f"全部完成,共 {len(collected)} 条")
asyncio.run(main())
6. 注意事项¶
-
默认返回数量:
user_all()默认limit=100,这是常见场景下的合理上限。如需获取更多,显式传入更大的值或limit=0。 -
500 次分页上限:
limit=0时,内部最多执行 500 次分页,约对应 6000 条内容。超出后抛出RuntimeError。大多数活跃账号的历史内容不会超过此限制。 -
速率限制:Instagram 会对频繁的 Feed 请求进行限速。建议在相邻页请求之间适当等待(如
time.sleep(1)),尤其是在limit=0的全量获取场景。 -
私密账号:查询私密账号的 Feed 要求客户端已登录,且与目标账号存在关注关系。否则将抛出
PermissionError。 -
WebClient 同样支持:
WebClient也可以创建FeedApi,用法与Client完全相同:feed_api = web_client.feed()。
7. 常见问题¶
Q:如何知道用户的数字 ID?
可通过 UserApi 先查询用户名获取 ID:
import asyncio
from igapi import AccountInfo, Client
account = AccountInfo.parse("用户名:密码||设备信息|cookies||")
client = Client(account=account)
async def main():
user_api = client.user()
user_id = await user_api.id_from_username("目标用户名")
print(f"用户 ID: {user_id}")
asyncio.run(main())
Q:user() 每页返回多少条内容?
每页返回条数由 Instagram 服务端决定,通常在 12 到 33 条之间,无法在客户端控制。如需精确条数,使用 user_all(limit=N) 获取前 N 条后截断。
Q:触发速率限制后该怎么办?
捕获 RuntimeError 并等待一段时间后重试:
import asyncio
import time
from igapi import AccountInfo, Client
account = AccountInfo.parse("用户名:密码||设备信息|cookies||")
client = Client(account=account)
feed_api = client.feed()
user_id = 123456789
async def main():
cursor = None
try:
items, cursor = await feed_api.user(user_id, max_id=cursor)
except RuntimeError as e:
if "速率限制" in str(e) or "rate" in str(e).lower():
print("触发速率限制,等待 60 秒后重试...")
time.sleep(60)
else:
raise
asyncio.run(main())